import os
import string
import adodbapi
import unittest
_computername=r"(local)"
_database='unittest'
_uid="sa"
_pwd=""
constr="Provider=SQLOLEDB.1; User ID=%s;Password=%s; Initial Catalog=%s; Data Source=%s" % (_uid,
_pwd,
_database,
_computername)
class TestTSQLUnit(unittest.TestCase):
def setUp(self):
self.conn=adodbapi.connect(constr)
self.crsr=self.conn.cursor()
self.crsr.execute("select count(*) from sysobjects where xtype='P' and name LIKE 'ut[_]%' ")
assert self.crsr.fetchone()[0]==0,"""Error when setting up testcases, old testcases in database.
Please remove testcases starting with ut and underscore"""
def testDescribe(self):
countSetupAndTearDowns=0
testsNotInTestSuite=['ut_someTest1','ut_someTest2']
testsInTestSuite=['ut_someTestSuite_someTest1',
'ut_someTestSuite_anotherTest',
'ut_someTestSuite_aThirdTest']
testsInTestSuiteWithSetup=['ut_testsuite2_setup',
'ut_testsuite2_anotherTest',
'ut_testsuite2_aThirdTest']
countSetupAndTearDowns+=1
testsInTestSuiteWithSetupAndTearDown=['ut_ts3_setup',
'ut_ts3_test',
'ut_ts3_tearDown']
countSetupAndTearDowns+=2
testsInTestSuiteWithTearDown=['ut_suite4_test',
'ut_suite4_TEARDOWN']
countSetupAndTearDowns+=1
allTests= testsNotInTestSuite + testsInTestSuite + testsInTestSuiteWithSetup + \
testsInTestSuiteWithSetupAndTearDown + testsInTestSuiteWithTearDown
template= """
CREATE PROCEDURE %s AS
BEGIN
PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
END
"""
for spName in allTests:
self.crsr.execute(template % spName)
self.crsr.callproc("tsu_describe")
rs=self.crsr.fetchall()
assert len(rs) == len(allTests)-countSetupAndTearDowns
for test,testsuite,hasSetup,hasTeardown in rs:
assert test in allTests
assert string.upper(test[-5:])<>'SETUP'
assert string.upper(test[-8:])<>'TEARDOWN'
testdata=('',0,0)
if test in testsInTestSuite:
testdata = ('someTestSuite',0,0)
if test in testsInTestSuiteWithSetup:
testdata = ('testsuite2',1,0)
if test in testsInTestSuiteWithSetupAndTearDown:
testdata = ('ts3',1,1)
if test in testsInTestSuiteWithTearDown:
testdata = ('suite4',0,1)
oksuite,okhasSetUp,okhasTeardown = testdata
assert testsuite == oksuite
assert hasSetup == okhasSetUp
assert hasTeardown == okhasTeardown
def testOneRunOneFailure(self):
proc = """
CREATE PROCEDURE ut_aFailingTest AS
BEGIN
PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
EXEC tsu_failure 'a failing message'
END
"""
self.crsr.execute(proc)
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success==0
assert testcount==1
assert failures==1
assert errors==0
def testSeveralRuns(self):
nrOfFailures=5
proc = """
CREATE PROCEDURE ut_aFailingTest%i AS
BEGIN
PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
EXEC tsu_failure 'a failing message'
END
"""
for nr in range(nrOfFailures):
self.crsr.execute(proc % nr)
nrOfErrors=3
proc = """
CREATE PROCEDURE ut_testWithError%i AS
BEGIN
PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
SELECT 1/0 AS ThisWillBeAnError
END
"""
for nr in range(nrOfErrors):
self.crsr.execute(proc % nr)
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success == 0
assert testcount == nrOfErrors + nrOfFailures
assert failures == nrOfFailures
assert errors == nrOfErrors
def testRunNoTestcases(self):
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success == 1
assert testcount == 0
assert failures == 0
assert errors == 0
def testOneRunOneError(self):
proc = """
CREATE PROCEDURE ut_testWithError AS
BEGIN
PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
SELECT 1/0 AS ThisWillBeAnError
END
"""
self.crsr.execute(proc)
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success==0
assert testcount==1
assert failures==0
assert errors==1
def testThatTestcasesRunsInATransaction(self):
otherConn=adodbapi.connect(constr)
otherCrsr=otherConn.cursor()
maybeDropTableStatment="""
IF EXISTS(SELECT name
FROM sysobjects
WHERE name = N'tempDeleteMe'
AND type = 'U')
DROP TABLE tempDeleteMe
"""
maybeDropProcStatment="""
IF EXISTS(SELECT name
FROM sysobjects
WHERE name = N'ut_evilTestCase'
AND type = 'P')
DROP PROCEDURE ut_evilTestCase
"""
createTableStatement= """
CREATE TABLE tempDeleteMe (
aColumn varchar(500) NULL)
"""
insertStatement="""
INSERT INTO tempDeleteMe(aColumn)
VALUES('Important data, should not be deleted by the testcase')
"""
otherCrsr.execute(maybeDropTableStatment)
otherCrsr.execute(createTableStatement)
otherCrsr.execute(insertStatement)
otherConn.commit()
otherConn.close()
otherConn=None
otherCrsr=None
evilproc="""
CREATE PROCEDURE ut_evilTestCase AS
TRUNCATE TABLE tempDeleteMe
"""
self.crsr.execute(evilproc)
self.crsr.callproc('tsu_runTests')
self.conn.commit()
self.crsr.execute("SELECT aColumn FROM tempDeleteMe")
rs=self.crsr.fetchone()
assert rs[0]=='Important data, should not be deleted by the testcase'
#clean Up
otherConn=adodbapi.connect(constr)
otherCrsr=otherConn.cursor()
otherCrsr.execute(maybeDropTableStatment)
otherCrsr.execute(maybeDropProcStatment)
otherConn.commit()
def callRunTestsReturnOutputParameters(self):
# According to Knowledge base article Q313861 a recordset will not return to ADO if the
# stored procedure fails with an severe error. As a work around, the result of the
# last test is saved to the table tsuLastTestResult
try:
retval=self.crsr.callproc("tsu_runTests")
except:
pass #ADO aborts when the error occurs, ignore
self.crsr.execute("SELECT success, testCount, failureCount, errorCount FROM tsuLastTestResult")
rs=self.crsr.fetchone()
return rs
def testCreateTestResult(self):
countQuestion="SELECT COUNT(*) FROM tsuTestResults"
self.crsr.execute(countQuestion)
before=self.crsr.fetchone()[0]
self.crsr.callproc("tsu__private_createTestResult")
self.crsr.execute(countQuestion)
after=self.crsr.fetchone()[0]
assert after-before == 1
def printTestProcedures(self):
self.crsr.execute("select name from sysobjects where xtype='P' and name LIKE 'ut[_]%' ")
self.printRs()
def printRs(self,rs=None):
if rs==None:
rs=self.crsr.fetchall()
print 'Recordset:'
for row in rs:
print row
print '-------'
def testASuite(self):
setupproc="""
CREATE PROCEDURE ut_mysuite_setup AS
BEGIN
CREATE TABLE aTableCommonToTheTests ( aColumn varchar(500) NULL)
INSERT INTO aTableCommonToTheTests(aColumn) VALUES('Some value')
END"""
self.crsr.execute(setupproc)
procOne="""
CREATE PROCEDURE ut_mysuite_test1 AS
IF (1 <> (SELECT COUNT(*) FROM aTableCommonToTheTests))
EXEC tsu_failure 'There should be one row in the common table'
"""
self.crsr.execute(procOne)
procTwo="""
CREATE PROCEDURE ut_mysuite_test2 AS
IF (1 <> (SELECT COUNT(*) FROM aTableCommonToTheTests))
EXEC tsu_failure 'There should be one row in the common table'
"""
self.crsr.execute(procTwo)
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success==1
assert testcount==2
assert failures==0
assert errors==0
def testTearDownIsExecuted(self):
procOne="""
CREATE PROCEDURE ut_mysuite_test1 AS
PRINT 'I Do nothing'
IF (1 <> (SELECT COUNT(*) FROM aTableCommonToTheTests))
EXEC tsu_failure 'There should be one row in the common table'
"""
self.crsr.execute(procOne)
procTearDown="""
CREATE PROCEDURE ut_mysuite_tEaRdOwN AS
EXEC master.dbo.xp_cmdShell 'type "deleteme">C:\deleteme.txt'
"""
self.crsr.execute(procTearDown)
lst=os.listdir('C:\\')
assert 'deleteme.txt' not in lst
self.callRunTestsReturnOutputParameters()
lst=os.listdir('C:\\')
assert 'deleteme.txt' in lst
os.remove('C:\\deleteme.txt')
def testCookbookExampleOne(self):
proc="""
CREATE PROCEDURE ut_testCapitalize AS
BEGIN
DECLARE @outStr VARCHAR(500)
EXEC capitalize 'a string', @outStr OUT
IF (@outStr <> 'A string') OR @outStr IS NULL
EXEC tsu_failure 'Capitalize should make the first character uppercase'
END
"""
self.crsr.execute(proc)
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success==0
assert testcount==1
assert failures==1
assert errors==0
def testCookbookExampleTwo(self):
proc="""
CREATE PROCEDURE ut_testCapitalize AS
BEGIN
DECLARE @outStr VARCHAR(500)
EXEC capitalize 'a string', @outStr OUT
IF (ASCII(LEFT(@outStr,1)) <> ASCII('A')) OR @outStr IS NULL
EXEC tsu_failure 'Capitalize should make the first character uppercase'
END
"""
self.crsr.execute(proc)
proc="""
CREATE PROCEDURE capitalize @inStr VARCHAR(500), @outStr VARCHAR(500) OUT AS
BEGIN
SET @outStr='A string'
END
"""
self.crsr.execute(proc)
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success==1
assert testcount==1
assert failures==0
assert errors==0
proc="""
ALTER PROCEDURE capitalize @inStr VARCHAR(500), @outStr VARCHAR(500) OUT AS
BEGIN
SET @outStr='a string'
END
"""
self.crsr.execute(proc)
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success==0
assert testcount==1
assert failures==1
assert errors==0
def testThatAlterWorks(self):
proc="""
CREATE PROCEDURE ut_test AS
EXEC tsu_failure 'failuremsg'
"""
self.crsr.execute(proc)
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success==0
assert testcount==1
assert failures==1
assert errors==0
proc="""
ALTER PROCEDURE ut_test AS
PRINT 'I am running but not failing'
"""
self.crsr.execute(proc)
success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
assert success==1
assert testcount==1
assert failures==0
assert errors==0
def tearDown(self):
self.conn.rollback()
self.conn.close()
self.conn=None
self.crsr=None
class VisibleTestTSQLUnit(unittest.TestCase):
sqlInputFileName="osqlinput.txt"
sqlOutputFileName="osqloutput.txt"
nameOfSuite="TheNameOfTheSuite"
procedureNames=('ut_' + nameOfSuite + '_theFirstTest',
'ut_' + nameOfSuite + '_theSecondTest',
'ut_' + nameOfSuite + '_theThirdTest')
def testFeedbackFailure(self):
failureMessage="The failure message looks like this."
sql="""
CREATE PROCEDURE %s AS
EXEC tsu_failure '%s'
GO
CREATE PROCEDURE %s AS
PRINT 'Something'
GO
CREATE PROCEDURE %s AS
PRINT 'Something'
GO
EXEC tsu_runTests
GO
""" % (self.procedureNames[0],
failureMessage,
self.procedureNames[1],
self.procedureNames[2])
output=self.runSQLthroughOSQL(sql)
#Look for important things in the visible output
lookfor=[ failureMessage,
"Test: " + self.procedureNames[0],
"3 tests, of which 1 failed and 0 had an error.",
"Testsuite: " + self.nameOfSuite + " (3 tests)",
"FAILURE!" ]
for findThis in lookfor:
assert string.find(output,findThis) > -1,findThis
def testFeedbackOk(self):
sql="""
CREATE PROCEDURE %s AS
PRINT 'The print statement shows up'
GO
CREATE PROCEDURE %s AS
PRINT 'Something'
GO
CREATE PROCEDURE %s AS
PRINT 'Something'
GO
EXEC tsu_runTests
GO
""" % self.procedureNames
output=self.runSQLthroughOSQL(sql)
#Look for important things in the visible output
lookfor=[ "3 tests, of which 0 failed and 0 had an error.",
"SUCCESS!",
"The print statement shows up"]
for findThis in lookfor:
assert string.find(output,findThis) > -1, findThis
def runSQLthroughOSQL(self,sql):
inputFile=open(self.sqlInputFileName,"w")
inputFile.write(sql)
inputFile.close()
cmdline= "osql -U %s -P %s -S %s -d %s -i %s -o %s -w 500" % ( _uid ,_pwd ,_computername, _database,
self.sqlInputFileName, self.sqlOutputFileName)
os.system(cmdline)
outputFile=open(self.sqlOutputFileName,"r")
f=outputFile.read()
outputFile.close()
return f
def tearDown(self):
try:
s=""
for proc in self.procedureNames:
s= s + " DROP PROCEDURE " + proc + "\n GO \n"
self.runSQLthroughOSQL(s)
except:
pass
try:
os.remove(self.sqlInputFileName)
except:
pass
try:
os.remove(self.sqlOutputFileName)
except:
pass
if __name__ == '__main__':
unittest.main()