import os
import string
import adodbapi
import unittest

_computername=r"(local)"
_database='unittest'
_uid="sa"
_pwd=""
constr="Provider=SQLOLEDB.1; User ID=%s;Password=%s; Initial Catalog=%s; Data Source=%s" % (_uid,
                                                                                            _pwd,
                                                                                            _database,
                                                                                            _computername)



class TestTSQLUnit(unittest.TestCase):
    def setUp(self):
        self.conn=adodbapi.connect(constr)
        self.crsr=self.conn.cursor()
        
        self.crsr.execute("select count(*) from sysobjects where xtype='P' and name LIKE 'ut[_]%' ")
        assert self.crsr.fetchone()[0]==0,"""Error when setting up testcases, old testcases in database.
        Please remove testcases starting with ut and underscore"""

    def testDescribe(self):
        countSetupAndTearDowns=0
        testsNotInTestSuite=['ut_someTest1','ut_someTest2']
        testsInTestSuite=['ut_someTestSuite_someTest1',
                          'ut_someTestSuite_anotherTest',
                          'ut_someTestSuite_aThirdTest']       
        testsInTestSuiteWithSetup=['ut_testsuite2_setup',
                              'ut_testsuite2_anotherTest',
                              'ut_testsuite2_aThirdTest']
        countSetupAndTearDowns+=1

        testsInTestSuiteWithSetupAndTearDown=['ut_ts3_setup',
                                              'ut_ts3_test',
                                              'ut_ts3_tearDown']
        countSetupAndTearDowns+=2
        testsInTestSuiteWithTearDown=['ut_suite4_test',
                                      'ut_suite4_TEARDOWN']
        countSetupAndTearDowns+=1
        
        allTests= testsNotInTestSuite +  testsInTestSuite + testsInTestSuiteWithSetup + \
                  testsInTestSuiteWithSetupAndTearDown + testsInTestSuiteWithTearDown
        template= """
        CREATE PROCEDURE %s AS
        BEGIN
            PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
        END
        """
        
        for spName in allTests:
            self.crsr.execute(template % spName)

        self.crsr.callproc("tsu_describe") 

        rs=self.crsr.fetchall()

        assert len(rs) == len(allTests)-countSetupAndTearDowns
        for test,testsuite,hasSetup,hasTeardown in rs:
            assert test in allTests
            assert string.upper(test[-5:])<>'SETUP'
            assert string.upper(test[-8:])<>'TEARDOWN'                                                
            testdata=('',0,0)
            if test in testsInTestSuite:
                testdata = ('someTestSuite',0,0)
            if test in testsInTestSuiteWithSetup:
                testdata = ('testsuite2',1,0)
            if test in testsInTestSuiteWithSetupAndTearDown:
                testdata = ('ts3',1,1)            
            if test in testsInTestSuiteWithTearDown:
                testdata = ('suite4',0,1)
            oksuite,okhasSetUp,okhasTeardown = testdata
            assert testsuite == oksuite
            assert hasSetup == okhasSetUp
            assert hasTeardown == okhasTeardown             
             
    def testOneRunOneFailure(self):
        proc = """
        CREATE PROCEDURE ut_aFailingTest AS
        BEGIN
            PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
            EXEC tsu_failure 'a failing message'
        END
        """
        self.crsr.execute(proc)
        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
        assert success==0
        assert testcount==1
        assert failures==1
        assert errors==0

    def testSeveralRuns(self):
        nrOfFailures=5
        proc = """
        CREATE PROCEDURE ut_aFailingTest%i AS
        BEGIN
            PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
            EXEC tsu_failure 'a failing message'
        END
        """
        for nr in range(nrOfFailures):
            self.crsr.execute(proc % nr)
        nrOfErrors=3
        proc = """
        CREATE PROCEDURE ut_testWithError%i AS
        BEGIN
            PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
            SELECT 1/0 AS ThisWillBeAnError           
        END
        """
        for nr in range(nrOfErrors):
            self.crsr.execute(proc % nr)
 
        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
      
        assert success == 0
        assert testcount == nrOfErrors + nrOfFailures
        assert failures == nrOfFailures
        assert errors == nrOfErrors        
        
        
    def testRunNoTestcases(self):
        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
        assert success == 1
        assert testcount == 0
        assert failures == 0
        assert errors == 0             


    def testOneRunOneError(self):
        proc = """
        CREATE PROCEDURE ut_testWithError AS
        BEGIN
            PRINT 'I was generated by the test_tsqlunit.py testcase, you can safely delete me'
            SELECT 1/0 AS ThisWillBeAnError           
        END
        """
        self.crsr.execute(proc)
        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
        assert success==0
        assert testcount==1
        assert failures==0
        assert errors==1

    def testThatTestcasesRunsInATransaction(self):
        otherConn=adodbapi.connect(constr)
        otherCrsr=otherConn.cursor()
        maybeDropTableStatment="""
            IF EXISTS(SELECT name 
                  FROM sysobjects 
                  WHERE  name = N'tempDeleteMe' 
                  AND    type = 'U')
                DROP TABLE tempDeleteMe  
        """
        maybeDropProcStatment="""
            IF EXISTS(SELECT name 
                  FROM  sysobjects 
                  WHERE  name = N'ut_evilTestCase' 
                  AND  type = 'P')
                DROP PROCEDURE ut_evilTestCase  
        """        
        createTableStatement= """
        CREATE TABLE tempDeleteMe (
            aColumn varchar(500) NULL)         
        """
        insertStatement="""
        INSERT INTO tempDeleteMe(aColumn)
        VALUES('Important data, should not be deleted by the testcase')
        """
        otherCrsr.execute(maybeDropTableStatment)
        otherCrsr.execute(createTableStatement)
        otherCrsr.execute(insertStatement)        
        otherConn.commit()
        otherConn.close()
        otherConn=None
        otherCrsr=None
        
        
        evilproc="""
        CREATE PROCEDURE ut_evilTestCase AS
            TRUNCATE TABLE tempDeleteMe
        """
        
        self.crsr.execute(evilproc)
        self.crsr.callproc('tsu_runTests')
        self.conn.commit()
        
        self.crsr.execute("SELECT aColumn FROM tempDeleteMe")
        rs=self.crsr.fetchone()
        assert rs[0]=='Important data, should not be deleted by the testcase'

        #clean Up
        otherConn=adodbapi.connect(constr)
        otherCrsr=otherConn.cursor()        
        otherCrsr.execute(maybeDropTableStatment)
        otherCrsr.execute(maybeDropProcStatment)
        otherConn.commit()
        
    def callRunTestsReturnOutputParameters(self):
        # According to Knowledge base article Q313861 a recordset will not return to ADO if the
        # stored procedure fails with an severe error. As a work around, the result of the
        # last test is saved to the table tsuLastTestResult
        try:
            retval=self.crsr.callproc("tsu_runTests")
        except: 
            pass  #ADO aborts when the error occurs, ignore
        self.crsr.execute("SELECT success, testCount, failureCount, errorCount FROM tsuLastTestResult")
        rs=self.crsr.fetchone()
        return rs

    def testCreateTestResult(self):
        countQuestion="SELECT COUNT(*) FROM tsuTestResults"
        self.crsr.execute(countQuestion)
        before=self.crsr.fetchone()[0]       

        self.crsr.callproc("tsu__private_createTestResult")

        self.crsr.execute(countQuestion)        
        after=self.crsr.fetchone()[0]
        assert after-before == 1

    def printTestProcedures(self):
        self.crsr.execute("select name from sysobjects where xtype='P' and name LIKE 'ut[_]%' ")
        self.printRs()
        
    def printRs(self,rs=None):
        if rs==None:
            rs=self.crsr.fetchall()
        print 'Recordset:'
        for row in rs:
            print row
        print '-------'

    def testASuite(self):
        setupproc="""
        CREATE PROCEDURE ut_mysuite_setup AS
        BEGIN
            CREATE TABLE aTableCommonToTheTests ( aColumn varchar(500) NULL)
            INSERT INTO aTableCommonToTheTests(aColumn) VALUES('Some value')        
        END"""
        self.crsr.execute(setupproc)
        procOne="""
        CREATE PROCEDURE ut_mysuite_test1 AS
            IF (1 <> (SELECT COUNT(*) FROM aTableCommonToTheTests))
                EXEC tsu_failure 'There should be one row in the common table'
        """        
        self.crsr.execute(procOne)
        procTwo="""
        CREATE PROCEDURE ut_mysuite_test2 AS
            IF (1 <> (SELECT COUNT(*) FROM aTableCommonToTheTests))
                EXEC tsu_failure 'There should be one row in the common table'
        """
        self.crsr.execute(procTwo)
        
        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()

        assert success==1
        assert testcount==2
        assert failures==0
        assert errors==0

    def testTearDownIsExecuted(self):
        procOne="""
        CREATE PROCEDURE ut_mysuite_test1 AS
            PRINT 'I Do nothing'
            IF (1 <> (SELECT COUNT(*) FROM aTableCommonToTheTests))
                EXEC tsu_failure 'There should be one row in the common table'
        """        
        self.crsr.execute(procOne)
        procTearDown="""
        CREATE PROCEDURE ut_mysuite_tEaRdOwN AS
            EXEC master.dbo.xp_cmdShell 'type "deleteme">C:\deleteme.txt'
        """
        self.crsr.execute(procTearDown)
        lst=os.listdir('C:\\')
        assert 'deleteme.txt' not in lst

        self.callRunTestsReturnOutputParameters()
        
        lst=os.listdir('C:\\')
        assert 'deleteme.txt' in lst
        os.remove('C:\\deleteme.txt')

    def testCookbookExampleOne(self):
        proc="""
        CREATE PROCEDURE ut_testCapitalize AS
        BEGIN
            DECLARE @outStr VARCHAR(500)
            EXEC capitalize 'a string', @outStr OUT
            IF (@outStr <> 'A string') OR @outStr IS NULL
                EXEC tsu_failure 'Capitalize should make the first character uppercase'
        END        
        """
        self.crsr.execute(proc)
        
        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
        assert success==0
        assert testcount==1
        assert failures==1
        assert errors==0

    def testCookbookExampleTwo(self):
        proc="""
        CREATE PROCEDURE ut_testCapitalize AS
        BEGIN
            DECLARE @outStr VARCHAR(500)
            EXEC capitalize 'a string', @outStr OUT
            IF (ASCII(LEFT(@outStr,1)) <> ASCII('A')) OR @outStr IS NULL
                EXEC tsu_failure 'Capitalize should make the first character uppercase'
        END                    
        """
        self.crsr.execute(proc)
        proc="""
        CREATE PROCEDURE capitalize @inStr VARCHAR(500), @outStr VARCHAR(500) OUT AS
        BEGIN
            SET @outStr='A string'
        END        
        """
        self.crsr.execute(proc)
        
        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
        assert success==1
        assert testcount==1
        assert failures==0
        assert errors==0

        proc="""
        ALTER PROCEDURE capitalize @inStr VARCHAR(500), @outStr VARCHAR(500) OUT AS
        BEGIN
            SET @outStr='a string'
        END
        """
        
        self.crsr.execute(proc)

        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
        assert success==0
        assert testcount==1
        assert failures==1
        assert errors==0

        

    def testThatAlterWorks(self):
        proc="""
        CREATE PROCEDURE ut_test AS
           EXEC tsu_failure 'failuremsg'
        """
        self.crsr.execute(proc)
       
        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
        assert success==0
        assert testcount==1
        assert failures==1
        assert errors==0
        proc="""
        ALTER PROCEDURE ut_test AS
           PRINT 'I am running but not failing'
        """
        self.crsr.execute(proc)

        success,testcount,failures,errors = self.callRunTestsReturnOutputParameters()
        assert success==1
        assert testcount==1
        assert failures==0
        assert errors==0

    def tearDown(self):
        self.conn.rollback()
        self.conn.close()
        self.conn=None
        self.crsr=None

class VisibleTestTSQLUnit(unittest.TestCase):

    sqlInputFileName="osqlinput.txt"
    sqlOutputFileName="osqloutput.txt"

    nameOfSuite="TheNameOfTheSuite"
    procedureNames=('ut_' + nameOfSuite + '_theFirstTest',
                    'ut_' + nameOfSuite + '_theSecondTest',
                    'ut_' + nameOfSuite + '_theThirdTest')
    
    def testFeedbackFailure(self):
        failureMessage="The failure message looks like this."

        sql="""
            CREATE PROCEDURE %s AS
                EXEC tsu_failure '%s'
            GO
            CREATE PROCEDURE %s AS
                PRINT 'Something'
            GO
            CREATE PROCEDURE %s AS
                PRINT 'Something'
            GO
            EXEC tsu_runTests
            GO
        """ % (self.procedureNames[0],
               failureMessage,
               self.procedureNames[1],
               self.procedureNames[2])
        
        output=self.runSQLthroughOSQL(sql)
        #Look for important things in the visible output
        lookfor=[ failureMessage,
                 "Test: " + self.procedureNames[0],
                 "3 tests, of which 1 failed and 0 had an error.",
                 "Testsuite: " + self.nameOfSuite + " (3 tests)",
                 "FAILURE!" ]
        for findThis in lookfor:
            assert string.find(output,findThis) > -1,findThis
            
    def testFeedbackOk(self):
        sql="""
            CREATE PROCEDURE %s AS
                PRINT 'The print statement shows up'
            GO
            CREATE PROCEDURE %s AS
                PRINT 'Something'
            GO
            CREATE PROCEDURE %s AS
                PRINT 'Something'
            GO
            EXEC tsu_runTests
            GO
        """ % self.procedureNames
        
        output=self.runSQLthroughOSQL(sql)

        #Look for important things in the visible output
        lookfor=[ "3 tests, of which 0 failed and 0 had an error.", 
                  "SUCCESS!",
                  "The print statement shows up"]
        for findThis in lookfor:
            assert string.find(output,findThis) > -1, findThis            
    
        
    def runSQLthroughOSQL(self,sql):
        inputFile=open(self.sqlInputFileName,"w")
        inputFile.write(sql)
        inputFile.close()
        
        cmdline= "osql -U %s -P %s -S %s -d %s -i %s -o %s -w 500" % ( _uid ,_pwd ,_computername, _database,
                                                                self.sqlInputFileName, self.sqlOutputFileName)
        os.system(cmdline)

        outputFile=open(self.sqlOutputFileName,"r")
        f=outputFile.read()
        outputFile.close()
        return f        

    def tearDown(self):
        try:
            s=""
            for proc in self.procedureNames:
                s= s + " DROP PROCEDURE " + proc + "\n GO \n"
            self.runSQLthroughOSQL(s)
        except:
            pass
        try:
            os.remove(self.sqlInputFileName)
        except:
            pass
        try:
            os.remove(self.sqlOutputFileName)
        except:
            pass        
    
        

if __name__ == '__main__':
    unittest.main()