test1451: add SMB support to the testbed

Add test 1451 which does some very basic SMB testing using the impacket
SMB server.

Closes #1630
This commit is contained in:
Max Dymond 2017-07-03 11:00:04 +01:00 committed by Daniel Stenberg
parent f1609155d5
commit a6f8d27efc
7 changed files with 889 additions and 511 deletions

56
tests/curl_test_data.py Executable file
View File

@ -0,0 +1,56 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Project ___| | | | _ \| |
# / __| | | | |_) | |
# | (__| |_| | _ <| |___
# \___|\___/|_| \_\_____|
#
# Copyright (C) 2017, Daniel Stenberg, <daniel@haxx.se>, et al.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://curl.haxx.se/docs/copyright.html.
#
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
# copies of the Software, and permit persons to whom the Software is
# furnished to do so, under the terms of the COPYING file.
#
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
# KIND, either express or implied.
#
"""Module for extracting test data from the test data folder"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import os
import xml.etree.ElementTree as ET
import logging
log = logging.getLogger(__name__)
class TestData(object):
def __init__(self, data_folder):
self.data_folder = data_folder
def get_test_data(self, test_number):
# Create the test file name
filename = os.path.join(self.data_folder,
"test{0}".format(test_number))
# The user should handle the exception from failing to find the file.
tree = ET.parse(filename)
# We need the <reply><data> text.
reply = tree.find("reply")
data = reply.find("data")
# Return the text contents of the data
return data.text
if __name__ == '__main__':
td = TestData("./data")
data = td.get_test_data(1)
print(data)

View File

@ -154,7 +154,7 @@ test1416 test1417 test1418 test1419 test1420 test1421 test1422 test1423 \
test1424 test1425 test1426 \
test1428 test1429 test1430 test1431 test1432 test1433 test1434 test1435 \
test1436 test1437 test1438 test1439 test1440 test1441 test1442 test1443 \
test1444 test1445 test1446 test1450 \
test1444 test1445 test1446 test1450 test1451 \
\
test1500 test1501 test1502 test1503 test1504 test1505 test1506 test1507 \
test1508 test1509 test1510 test1511 test1512 test1513 test1514 test1515 \

36
tests/data/test1451 Normal file
View File

@ -0,0 +1,36 @@
<testcase>
<info>
<keywords>
SMB
</keywords>
</info>
#
# Server-side
<reply>
<data>Basic SMB test complete</data>
</reply>
#
# Client-side
<client>
<server>
smb
</server>
<features>
smb
</features>
<name>
Basic SMB request
</name>
<command>
-u 'curltest:curltest' smb://%HOSTIP:%SMBPORT/TESTS/1451
</command>
</client>
#
# Verify data after the test has been "shot"
<verify>
<stdout>Basic SMB test complete</stdout>
</verify>
</testcase>

View File

@ -38,8 +38,7 @@ import errno
import sys
import random
import shutil
import string
from binascii import unhexlify, hexlify
from binascii import hexlify
# For signing
from impacket import smb, nmb, ntlm, uuid, LOG
@ -4167,250 +4166,3 @@ smb.SMB.TRANS_TRANSACT_NMPIPE :self.__smbTransHandler.transactNamedPipe
# For windows platforms, opening a directory is not an option, so we set a void FD
VOID_FILE_DESCRIPTOR = -1
PIPE_FILE_DESCRIPTOR = -2
######################################################################
# HELPER CLASSES
######################################################################
from impacket.dcerpc.v5.rpcrt import DCERPCServer
from impacket.dcerpc.v5.dtypes import NULL
from impacket.dcerpc.v5.srvs import NetrShareEnum, NetrShareEnumResponse, SHARE_INFO_1, NetrServerGetInfo, NetrServerGetInfoResponse, NetrShareGetInfo, NetrShareGetInfoResponse
from impacket.dcerpc.v5.wkst import NetrWkstaGetInfo, NetrWkstaGetInfoResponse
from impacket.system_errors import ERROR_INVALID_LEVEL
class WKSTServer(DCERPCServer):
def __init__(self):
DCERPCServer.__init__(self)
self.wkssvcCallBacks = {
0: self.NetrWkstaGetInfo,
}
self.addCallbacks(('6BFFD098-A112-3610-9833-46C3F87E345A', '1.0'),'\\PIPE\\wkssvc', self.wkssvcCallBacks)
def NetrWkstaGetInfo(self,data):
request = NetrWkstaGetInfo(data)
self.log("NetrWkstaGetInfo Level: %d" % request['Level'])
answer = NetrWkstaGetInfoResponse()
if request['Level'] not in (100, 101):
answer['ErrorCode'] = ERROR_INVALID_LEVEL
return answer
answer['WkstaInfo']['tag'] = request['Level']
if request['Level'] == 100:
# Windows. Decimal value 500.
answer['WkstaInfo']['WkstaInfo100']['wki100_platform_id'] = 0x000001F4
answer['WkstaInfo']['WkstaInfo100']['wki100_computername'] = NULL
answer['WkstaInfo']['WkstaInfo100']['wki100_langroup'] = NULL
answer['WkstaInfo']['WkstaInfo100']['wki100_ver_major'] = 5
answer['WkstaInfo']['WkstaInfo100']['wki100_ver_minor'] = 0
else:
# Windows. Decimal value 500.
answer['WkstaInfo']['WkstaInfo101']['wki101_platform_id'] = 0x000001F4
answer['WkstaInfo']['WkstaInfo101']['wki101_computername'] = NULL
answer['WkstaInfo']['WkstaInfo101']['wki101_langroup'] = NULL
answer['WkstaInfo']['WkstaInfo101']['wki101_ver_major'] = 5
answer['WkstaInfo']['WkstaInfo101']['wki101_ver_minor'] = 0
answer['WkstaInfo']['WkstaInfo101']['wki101_lanroot'] = NULL
return answer
class SRVSServer(DCERPCServer):
def __init__(self):
DCERPCServer.__init__(self)
self._shares = {}
self.__serverConfig = None
self.__logFile = None
self.srvsvcCallBacks = {
15: self.NetrShareEnum,
16: self.NetrShareGetInfo,
21: self.NetrServerGetInfo,
}
self.addCallbacks(('4B324FC8-1670-01D3-1278-5A47BF6EE188', '3.0'),'\\PIPE\\srvsvc', self.srvsvcCallBacks)
def setServerConfig(self, config):
self.__serverConfig = config
def processConfigFile(self, configFile=None):
if configFile is not None:
self.__serverConfig = ConfigParser.ConfigParser()
self.__serverConfig.read(configFile)
sections = self.__serverConfig.sections()
# Let's check the log file
self.__logFile = self.__serverConfig.get('global','log_file')
if self.__logFile != 'None':
logging.basicConfig(filename = self.__logFile,
level = logging.DEBUG,
format="%(asctime)s: %(levelname)s: %(message)s",
datefmt = '%m/%d/%Y %I:%M:%S %p')
# Remove the global one
del(sections[sections.index('global')])
self._shares = {}
for i in sections:
self._shares[i] = dict(self.__serverConfig.items(i))
def NetrShareGetInfo(self,data):
request = NetrShareGetInfo(data)
self.log("NetrGetShareInfo Level: %d" % request['Level'])
s = request['NetName'][:-1].upper()
answer = NetrShareGetInfoResponse()
if self._shares.has_key(s):
share = self._shares[s]
answer['InfoStruct']['tag'] = 1
answer['InfoStruct']['ShareInfo1']['shi1_netname']= s+'\x00'
answer['InfoStruct']['ShareInfo1']['shi1_type'] = share['share type']
answer['InfoStruct']['ShareInfo1']['shi1_remark'] = share['comment']+'\x00'
answer['ErrorCode'] = 0
else:
answer['InfoStruct']['tag'] = 1
answer['InfoStruct']['ShareInfo1']= NULL
answer['ErrorCode'] = 0x0906 #WERR_NET_NAME_NOT_FOUND
return answer
def NetrServerGetInfo(self,data):
request = NetrServerGetInfo(data)
self.log("NetrServerGetInfo Level: %d" % request['Level'])
answer = NetrServerGetInfoResponse()
answer['InfoStruct']['tag'] = 101
# PLATFORM_ID_NT = 500
answer['InfoStruct']['ServerInfo101']['sv101_platform_id'] = 500
answer['InfoStruct']['ServerInfo101']['sv101_name'] = request['ServerName']
# Windows 7 = 6.1
answer['InfoStruct']['ServerInfo101']['sv101_version_major'] = 6
answer['InfoStruct']['ServerInfo101']['sv101_version_minor'] = 1
# Workstation = 1
answer['InfoStruct']['ServerInfo101']['sv101_type'] = 1
answer['InfoStruct']['ServerInfo101']['sv101_comment'] = NULL
answer['ErrorCode'] = 0
return answer
def NetrShareEnum(self, data):
request = NetrShareEnum(data)
self.log("NetrShareEnum Level: %d" % request['InfoStruct']['Level'])
shareEnum = NetrShareEnumResponse()
shareEnum['InfoStruct']['Level'] = 1
shareEnum['InfoStruct']['ShareInfo']['tag'] = 1
shareEnum['TotalEntries'] = len(self._shares)
shareEnum['InfoStruct']['ShareInfo']['Level1']['EntriesRead'] = len(self._shares)
shareEnum['ErrorCode'] = 0
for i in self._shares:
shareInfo = SHARE_INFO_1()
shareInfo['shi1_netname'] = i+'\x00'
shareInfo['shi1_type'] = self._shares[i]['share type']
shareInfo['shi1_remark'] = self._shares[i]['comment']+'\x00'
shareEnum['InfoStruct']['ShareInfo']['Level1']['Buffer'].append(shareInfo)
return shareEnum
class SimpleSMBServer:
"""
SimpleSMBServer class - Implements a simple, customizable SMB Server
:param string listenAddress: the address you want the server to listen on
:param integer listenPort: the port number you want the server to listen on
:param string configFile: a file with all the servers' configuration. If no file specified, this class will create the basic parameters needed to run. You will need to add your shares manually tho. See addShare() method
"""
def __init__(self, listenAddress = '0.0.0.0', listenPort=445, configFile=''):
if configFile != '':
self.__server = SMBSERVER((listenAddress,listenPort))
self.__server.processConfigFile(configFile)
self.__smbConfig = None
else:
# Here we write a mini config for the server
self.__smbConfig = ConfigParser.ConfigParser()
self.__smbConfig.add_section('global')
self.__smbConfig.set('global','server_name',''.join([random.choice(string.letters) for _ in range(8)]))
self.__smbConfig.set('global','server_os',''.join([random.choice(string.letters) for _ in range(8)])
)
self.__smbConfig.set('global','server_domain',''.join([random.choice(string.letters) for _ in range(8)])
)
self.__smbConfig.set('global','log_file','None')
self.__smbConfig.set('global','rpc_apis','yes')
self.__smbConfig.set('global','credentials_file','')
self.__smbConfig.set('global', 'challenge', "A"*8)
# IPC always needed
self.__smbConfig.add_section('IPC$')
self.__smbConfig.set('IPC$','comment','')
self.__smbConfig.set('IPC$','read only','yes')
self.__smbConfig.set('IPC$','share type','3')
self.__smbConfig.set('IPC$','path','')
self.__server = SMBSERVER((listenAddress,listenPort), config_parser = self.__smbConfig)
self.__server.processConfigFile()
# Now we have to register the MS-SRVS server. This specially important for
# Windows 7+ and Mavericks clients since they WONT (specially OSX)
# ask for shares using MS-RAP.
self.__srvsServer = SRVSServer()
self.__srvsServer.daemon = True
self.__wkstServer = WKSTServer()
self.__wkstServer.daemon = True
self.__server.registerNamedPipe('srvsvc',('127.0.0.1',self.__srvsServer.getListenPort()))
self.__server.registerNamedPipe('wkssvc',('127.0.0.1',self.__wkstServer.getListenPort()))
def start(self):
self.__srvsServer.start()
self.__wkstServer.start()
self.__server.serve_forever()
def registerNamedPipe(self, pipeName, address):
return self.__server.registerNamedPipe(pipeName, address)
def unregisterNamedPipe(self, pipeName):
return self.__server.unregisterNamedPipe(pipeName)
def getRegisteredNamedPipes(self):
return self.__server.getRegisteredNamedPipes()
def addShare(self, shareName, sharePath, shareComment='', shareType = 0, readOnly = 'no'):
self.__smbConfig.add_section(shareName)
self.__smbConfig.set(shareName, 'comment', shareComment)
self.__smbConfig.set(shareName, 'read only', readOnly)
self.__smbConfig.set(shareName, 'share type', shareType)
self.__smbConfig.set(shareName, 'path', sharePath)
self.__server.setServerConfig(self.__smbConfig)
self.__srvsServer.setServerConfig(self.__smbConfig)
self.__server.processConfigFile()
self.__srvsServer.processConfigFile()
def removeShare(self, shareName):
self.__smbConfig.remove_section(shareName)
self.__server.setServerConfig(self.__smbConfig)
self.__srvsServer.setServerConfig(self.__smbConfig)
self.__server.processConfigFile()
self.__srvsServer.processConfigFile()
def setSMBChallenge(self, challenge):
if challenge != '':
self.__smbConfig.set('global', 'challenge', unhexlify(challenge))
self.__server.setServerConfig(self.__smbConfig)
self.__server.processConfigFile()
def setLogFile(self, logFile):
self.__smbConfig.set('global','log_file',logFile)
self.__server.setServerConfig(self.__smbConfig)
self.__server.processConfigFile()
def setCredentialsFile(self, logFile):
self.__smbConfig.set('global','credentials_file',logFile)
self.__server.setServerConfig(self.__smbConfig)
self.__server.processConfigFile()
def setSMB2Support(self, value):
if value is True:
self.__smbConfig.set("global", "SMB2Support", "True")
else:
self.__smbConfig.set("global", "SMB2Support", "False")
self.__server.setServerConfig(self.__smbConfig)
self.__server.processConfigFile()

View File

@ -146,6 +146,8 @@ my $HTTPPIPEPORT; # HTTP pipelining port
my $HTTPUNIXPATH; # HTTP server Unix domain socket path
my $HTTP2PORT; # HTTP/2 server port
my $DICTPORT; # DICT server port
my $SMBPORT; # SMB server port
my $SMBSPORT; # SMBS server port
my $srcdir = $ENV{'srcdir'} || '.';
my $CURL="../src/curl".exe_ext(); # what curl executable to run on the tests
@ -380,7 +382,7 @@ sub init_serverpidfile_hash {
}
}
for my $proto (('tftp', 'sftp', 'socks', 'ssh', 'rtsp', 'gopher', 'httptls',
'dict')) {
'dict', 'smb', 'smbs')) {
for my $ipvnum ((4, 6)) {
for my $idnum ((1, 2)) {
my $serv = servername_id($proto, $ipvnum, $idnum);
@ -1120,6 +1122,67 @@ sub verifysocks {
return $pid;
}
#######################################################################
# Verify that the server that runs on $ip, $port is our server. This also
# implies that we can speak with it, as there might be occasions when the
# server runs fine but we cannot talk to it ("Failed to connect to ::1: Can't
# assign requested address")
#
sub verifysmb {
my ($proto, $ipvnum, $idnum, $ip, $port) = @_;
my $server = servername_id($proto, $ipvnum, $idnum);
my $pid = 0;
my $time=time();
my $extra="";
my $verifylog = "$LOGDIR/".
servername_canon($proto, $ipvnum, $idnum) .'_verify.log';
unlink($verifylog) if(-f $verifylog);
my $flags = "--max-time $server_response_maxtime ";
$flags .= "--silent ";
$flags .= "--verbose ";
$flags .= "--globoff ";
$flags .= "-u 'curltest:curltest' ";
$flags .= $extra;
$flags .= "\"$proto://$ip:$port/SERVER/verifiedserver\"";
my $cmd = "$VCURL $flags 2>$verifylog";
# check if this is our server running on this port:
logmsg "RUN: $cmd\n" if($verbose);
my @data = runclientoutput($cmd);
my $res = $? >> 8; # rotate the result
if($res & 128) {
logmsg "RUN: curl command died with a coredump\n";
return -1;
}
foreach my $line (@data) {
if($line =~ /WE ROOLZ: (\d+)/) {
# this is our test server with a known pid!
$pid = 0+$1;
last;
}
}
if($pid <= 0 && @data && $data[0]) {
# this is not a known server
logmsg "RUN: Unknown server on our $server port: $port\n";
return 0;
}
# we can/should use the time it took to verify the server as a measure
# on how fast/slow this host is.
my $took = int(0.5+time()-$time);
if($verbose) {
logmsg "RUN: Verifying our test $server server took $took seconds\n";
}
$ftpchecktime = $took>=1?$took:1; # make sure it never is below 1
return $pid;
}
#######################################################################
# Verify that the server that runs on $ip, $port is our server.
# Retry over several seconds before giving up. The ssh server in
@ -1146,7 +1209,8 @@ my %protofunc = ('http' => \&verifyhttp,
'socks' => \&verifysocks,
'gopher' => \&verifyhttp,
'httptls' => \&verifyhttptls,
'dict' => \&verifyftp);
'dict' => \&verifyftp,
'smb' => \&verifysmb);
sub verifyserver {
my ($proto, $ipvnum, $idnum, $ip, $port) = @_;
@ -2243,6 +2307,83 @@ sub rundictserver {
return ($dictpid, $pid2);
}
#######################################################################
# start the SMB server
#
sub runsmbserver {
my ($verbose, $alt, $port) = @_;
my $proto = "smb";
my $ip = $HOSTIP;
my $ipvnum = 4;
my $idnum = 1;
my $server;
my $srvrname;
my $pidfile;
my $logfile;
my $flags = "";
if($alt eq "ipv6") {
# No IPv6
}
$server = servername_id($proto, $ipvnum, $idnum);
$pidfile = $serverpidfile{$server};
# don't retry if the server doesn't work
if ($doesntrun{$pidfile}) {
return (0,0);
}
my $pid = processexists($pidfile);
if($pid > 0) {
stopserver($server, "$pid");
}
unlink($pidfile) if(-f $pidfile);
$srvrname = servername_str($proto, $ipvnum, $idnum);
$logfile = server_logfilename($LOGDIR, $proto, $ipvnum, $idnum);
$flags .= "--verbose 1 " if($debugprotocol);
$flags .= "--pidfile \"$pidfile\" --logfile \"$logfile\" ";
$flags .= "--id $idnum " if($idnum > 1);
$flags .= "--port $port --srcdir \"$srcdir\"";
my $cmd = "$srcdir/smbserver.py $flags";
my ($smbpid, $pid2) = startnew($cmd, $pidfile, 15, 0);
if($smbpid <= 0 || !pidexists($smbpid)) {
# it is NOT alive
logmsg "RUN: failed to start the $srvrname server\n";
stopserver($server, "$pid2");
displaylogs($testnumcheck);
$doesntrun{$pidfile} = 1;
return (0,0);
}
# Server is up. Verify that we can speak to it.
my $pid3 = verifyserver($proto, $ipvnum, $idnum, $ip, $port);
if(!$pid3) {
logmsg "RUN: $srvrname server failed verification\n";
# failed to talk to it properly. Kill the server and return failure
stopserver($server, "$smbpid $pid2");
displaylogs($testnumcheck);
$doesntrun{$pidfile} = 1;
return (0,0);
}
$pid2 = $pid3;
if($verbose) {
logmsg "RUN: $srvrname server is now running PID $smbpid\n";
}
sleep(1);
return ($smbpid, $pid2);
}
#######################################################################
# Single shot http and gopher server responsiveness test. This should only
# be used to verify that a server present in %run hash is still functional
@ -2843,6 +2984,9 @@ sub subVariables {
$$thing =~ s/%DICTPORT/$DICTPORT/g;
$$thing =~ s/%SMBPORT/$SMBPORT/g;
$$thing =~ s/%SMBSPORT/$SMBSPORT/g;
# server Unix domain socket paths
$$thing =~ s/%HTTPUNIXPATH/$HTTPUNIXPATH/g;
@ -4695,6 +4839,17 @@ sub startservers {
$run{'dict'}="$pid $pid2";
}
}
elsif($what eq "smb") {
if(!$run{'smb'}) {
($pid, $pid2) = runsmbserver($verbose, "", $SMBPORT);
if($pid <= 0) {
return "failed starting SMB server";
}
logmsg sprintf ("* pid SMB => %d %d\n", $pid, $pid2)
if($verbose);
$run{'dict'}="$pid $pid2";
}
}
elsif($what eq "none") {
logmsg "* starts no server\n" if ($verbose);
}
@ -5155,6 +5310,8 @@ $HTTPPROXYPORT = $base++; # HTTP proxy port, when using CONNECT
$HTTPPIPEPORT = $base++; # HTTP pipelining port
$HTTP2PORT = $base++; # HTTP/2 port
$DICTPORT = $base++; # DICT port
$SMBPORT = $base++; # SMB port
$SMBSPORT = $base++; # SMBS port
$HTTPUNIXPATH = 'http.sock'; # HTTP server Unix domain socket path
#######################################################################

View File

@ -105,7 +105,7 @@ sub servername_str {
$proto = uc($proto) if($proto);
die "unsupported protocol: '$proto'" unless($proto &&
($proto =~ /^(((FTP|HTTP|HTTP\/2|IMAP|POP3|SMTP|HTTP-PIPE)S?)|(TFTP|SFTP|SOCKS|SSH|RTSP|GOPHER|HTTPTLS|DICT))$/));
($proto =~ /^(((FTP|HTTP|HTTP\/2|IMAP|POP3|SMTP|HTTP-PIPE)S?)|(TFTP|SFTP|SOCKS|SSH|RTSP|GOPHER|HTTPTLS|DICT|SMB|SMBS))$/));
$ipver = (not $ipver) ? 'ipv4' : lc($ipver);
die "unsupported IP version: '$ipver'" unless($ipver &&

377
tests/smbserver.py Executable file
View File

@ -0,0 +1,377 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Project ___| | | | _ \| |
# / __| | | | |_) | |
# | (__| |_| | _ <| |___
# \___|\___/|_| \_\_____|
#
# Copyright (C) 2017, Daniel Stenberg, <daniel@haxx.se>, et al.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://curl.haxx.se/docs/copyright.html.
#
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
# copies of the Software, and permit persons to whom the Software is
# furnished to do so, under the terms of the COPYING file.
#
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
# KIND, either express or implied.
#
"""Server for testing SMB"""
from __future__ import (absolute_import, division, print_function)
# unicode_literals)
import argparse
import ConfigParser
import os
import sys
import logging
import tempfile
# Import our curl test data helper
import curl_test_data
# This saves us having to set up the PYTHONPATH explicitly
deps_dir = os.path.join(os.path.dirname(__file__), "python_dependencies")
sys.path.append(deps_dir)
from impacket import smbserver as imp_smbserver
from impacket import smb as imp_smb
from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_SUCCESS,
STATUS_NO_SUCH_FILE)
log = logging.getLogger(__name__)
SERVER_MAGIC = "SERVER_MAGIC"
TESTS_MAGIC = "TESTS_MAGIC"
VERIFIED_REQ = "verifiedserver"
VERIFIED_RSP = b"WE ROOLZ: {pid}\n"
def smbserver(options):
"""Start up a TCP SMB server that serves forever
"""
if options.pidfile:
pid = os.getpid()
with open(options.pidfile, "w") as f:
f.write("{0}".format(pid))
# Here we write a mini config for the server
smb_config = ConfigParser.ConfigParser()
smb_config.add_section("global")
smb_config.set("global", "server_name", "SERVICE")
smb_config.set("global", "server_os", "UNIX")
smb_config.set("global", "server_domain", "WORKGROUP")
smb_config.set("global", "log_file", "")
smb_config.set("global", "credentials_file", "")
# We need a share which allows us to test that the server is running
smb_config.add_section("SERVER")
smb_config.set("SERVER", "comment", "server function")
smb_config.set("SERVER", "read only", "yes")
smb_config.set("SERVER", "share type", "0")
smb_config.set("SERVER", "path", SERVER_MAGIC)
# Have a share for tests. These files will be autogenerated from the
# test input.
smb_config.add_section("TESTS")
smb_config.set("TESTS", "comment", "tests")
smb_config.set("TESTS", "read only", "yes")
smb_config.set("TESTS", "share type", "0")
smb_config.set("TESTS", "path", TESTS_MAGIC)
if not options.srcdir or not os.path.isdir(options.srcdir):
raise ScriptException("--srcdir is mandatory")
test_data_dir = os.path.join(options.srcdir, "data")
smb_server = TestSmbServer(("127.0.0.1", options.port),
config_parser=smb_config,
test_data_directory=test_data_dir)
log.info("[SMB] setting up SMB server on port %s", options.port)
smb_server.processConfigFile()
smb_server.serve_forever()
return 0
class TestSmbServer(imp_smbserver.SMBSERVER):
"""
Test server for SMB which subclasses the impacket SMBSERVER and provides
test functionality.
"""
def __init__(self,
address,
config_parser=None,
test_data_directory=None):
imp_smbserver.SMBSERVER.__init__(self,
address,
config_parser=config_parser)
# Set up a test data object so we can get test data later.
self.ctd = curl_test_data.TestData(test_data_directory)
# Override smbComNtCreateAndX so we can pretend to have files which
# don't exist.
self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
self.create_and_x)
def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
"""
Our version of smbComNtCreateAndX looks for special test files and
fools the rest of the framework into opening them as if they were
normal files.
"""
conn_data = smb_server.getConnectionData(conn_id)
# Wrap processing in a try block which allows us to throw SmbException
# to control the flow.
try:
ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
smb_command["Parameters"])
path = self.get_share_path(conn_data,
ncax_parms["RootFid"],
recv_packet["Tid"])
log.info("[SMB] Requested share path: %s", path)
disposition = ncax_parms["Disposition"]
log.debug("[SMB] Requested disposition: %s", disposition)
# Currently we only support reading files.
if disposition != imp_smb.FILE_OPEN:
raise SmbException(STATUS_ACCESS_DENIED,
"Only support reading files")
# Check to see if the path we were given is actually a
# magic path which needs generating on the fly.
if path not in [SERVER_MAGIC, TESTS_MAGIC]:
# Pass the command onto the original handler.
return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
smb_server,
smb_command,
recv_packet)
flags2 = recv_packet["Flags2"]
ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
data=smb_command[
"Data"])
requested_file = imp_smbserver.decodeSMBString(
flags2,
ncax_data["FileName"])
log.debug("[SMB] User requested file '%s'", requested_file)
if path == SERVER_MAGIC:
fid, full_path = self.get_server_path(requested_file)
else:
assert (path == TESTS_MAGIC)
fid, full_path = self.get_test_path(requested_file)
resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
resp_data = ""
# Simple way to generate a fid
if len(conn_data["OpenedFiles"]) == 0:
fakefid = 1
else:
fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
resp_parms["Fid"] = fakefid
resp_parms["CreateAction"] = disposition
if os.path.isdir(path):
resp_parms[
"FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
resp_parms["IsDirectory"] = 1
else:
resp_parms["IsDirectory"] = 0
resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]
# Get this file's information
resp_info, error_code = imp_smbserver.queryPathInformation(
"", full_path, level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
if error_code != STATUS_SUCCESS:
raise SmbException(error_code, "Failed to query path info")
resp_parms["CreateTime"] = resp_info["CreationTime"]
resp_parms["LastAccessTime"] = resp_info[
"LastAccessTime"]
resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
resp_parms["LastChangeTime"] = resp_info[
"LastChangeTime"]
resp_parms["FileAttributes"] = resp_info[
"ExtFileAttributes"]
resp_parms["AllocationSize"] = resp_info[
"AllocationSize"]
resp_parms["EndOfFile"] = resp_info["EndOfFile"]
# Let's store the fid for the connection
# smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
conn_data["OpenedFiles"][fakefid] = {}
conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
conn_data["OpenedFiles"][fakefid]["FileName"] = path
conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
except SmbException as s:
log.debug("[SMB] SmbException hit: %s", s)
error_code = s.error_code
resp_parms = ""
resp_data = ""
resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
resp_cmd["Parameters"] = resp_parms
resp_cmd["Data"] = resp_data
smb_server.setConnectionData(conn_id, conn_data)
return [resp_cmd], None, error_code
def get_share_path(self, conn_data, root_fid, tid):
conn_shares = conn_data["ConnectedShares"]
if tid in conn_shares:
if root_fid > 0:
# If we have a rootFid, the path is relative to that fid
path = conn_data["OpenedFiles"][root_fid]["FileName"]
log.debug("RootFid present %s!" % path)
else:
if "path" in conn_shares[tid]:
path = conn_shares[tid]["path"]
else:
raise SmbException(STATUS_ACCESS_DENIED,
"Connection share had no path")
else:
raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
"TID was invalid")
return path
def get_server_path(self, requested_filename):
log.debug("[SMB] Get server path '%s'", requested_filename)
if requested_filename not in [VERIFIED_REQ]:
raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
fid, filename = tempfile.mkstemp()
log.debug("[SMB] Created %s (%d) for storing '%s'",
filename, fid, requested_filename)
contents = ""
if requested_filename == VERIFIED_REQ:
log.debug("[SMB] Verifying server is alive")
contents = VERIFIED_RSP.format(pid=os.getpid())
self.write_to_fid(fid, contents)
return fid, filename
def write_to_fid(self, fid, contents):
# Write the contents to file descriptor
os.write(fid, contents)
os.fsync(fid)
# Rewind the file to the beginning so a read gets us the contents
os.lseek(fid, 0, os.SEEK_SET)
def get_test_path(self, requested_filename):
log.info("[SMB] Get reply data from 'test%s'", requested_filename)
fid, filename = tempfile.mkstemp()
log.debug("[SMB] Created %s (%d) for storing test '%s'",
filename, fid, requested_filename)
try:
contents = self.ctd.get_test_data(requested_filename)
self.write_to_fid(fid, contents)
return fid, filename
except Exception:
log.exception("Failed to make test file")
raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
class SmbException(Exception):
def __init__(self, error_code, error_message):
super(SmbException, self).__init__(error_message)
self.error_code = error_code
class ScriptRC(object):
"""Enum for script return codes"""
SUCCESS = 0
FAILURE = 1
EXCEPTION = 2
class ScriptException(Exception):
pass
def get_options():
parser = argparse.ArgumentParser()
parser.add_argument("--port", action="store", default=9017,
type=int, help="port to listen on")
parser.add_argument("--verbose", action="store", type=int, default=0,
help="verbose output")
parser.add_argument("--pidfile", action="store",
help="file name for the PID")
parser.add_argument("--logfile", action="store",
help="file name for the log")
parser.add_argument("--srcdir", action="store", help="test directory")
parser.add_argument("--id", action="store", help="server ID")
parser.add_argument("--ipv4", action="store_true", default=0,
help="IPv4 flag")
return parser.parse_args()
def setup_logging(options):
"""
Set up logging from the command line options
"""
root_logger = logging.getLogger()
add_stdout = False
formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
# Write out to a logfile
if options.logfile:
handler = logging.FileHandler(options.logfile, mode="w")
handler.setFormatter(formatter)
handler.setLevel(logging.DEBUG)
root_logger.addHandler(handler)
else:
# The logfile wasn't specified. Add a stdout logger.
add_stdout = True
if options.verbose:
# Add a stdout logger as well in verbose mode
root_logger.setLevel(logging.DEBUG)
add_stdout = True
else:
root_logger.setLevel(logging.INFO)
if add_stdout:
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
stdout_handler.setLevel(logging.DEBUG)
root_logger.addHandler(stdout_handler)
if __name__ == '__main__':
# Get the options from the user.
options = get_options()
# Setup logging using the user options
setup_logging(options)
# Run main script.
try:
rc = smbserver(options)
except Exception as e:
log.exception(e)
rc = ScriptRC.EXCEPTION
log.info("[SMB] Returning %d", rc)
sys.exit(rc)