## ## """ GOALS: 1. Build a GUI front end for my customer database; learning Python & SQLite along the way. 2. ?????? 3. Profit DISCLAIMAR :: I am not a Database Administrator, nor am I an experienced C/C++/Perl/Ruby/etc coder, but I studied C++ & Java in College (CS). I am not an expert in anything other then MM Director. I am a Multimedia Developer (Flash, Director, js, websites & CDROM front ends) & PM. ...And I am a complete newbie in Python, this is my first attempt at a complete application. I needed it to go beyond the trival and simple one method "Hello World" apps. When I decided to get out of the multimedia world and return to my roots as a problem solver/coder, I chose Python, and like many newbies I tried to find some examples that went from design to rollout (so to speak)... In other words, show me a complete application that works which uses a database (I googled for this and it appears to be a common request from newbies). There where none that satisfied my needs. Let alone worked with SQLite in a meaningful way. So, I RTFM'ed and rolled my own. Due to my lack of experince and understanding of Python, there are plenty of mistakes; improper use of methods and functions and a host of other issues (including the small DB I made). Parts of this progarm are verbose and repetitive. And yet it works and behaviors as I excepted to, but I wouldn't use this program beyond personal use (like my own small business, in this case) and as a learning tool (right, wrong, whatever), it simply isn't designed that well (READ => very little error checking). Since I copied 80-90% of this code from knowledgeable others, I want to add it to the Python collective of code samples for what it is. :: - A very simple customer style database that uses SQLite and GUI (PythonCard) techniques. - :: A few of the methods I used: regexp lambda list comprehension parent/child window communication SQL multiple classes dynamic updating etc. If someone can improve on this program (and I know someone can), then please share those improvements so others may learn (including me). Scott Mallory EbbyFish Multimedia, LLC www.ebbyfish.com Steps: 1. Create database - based on PySQLite: Python DB-API 2.0 Compliant doc [SUCCESS] 2. Connect to/create db and populate first row (via command line - > pysqliteBrowse.py ) [SUCCESS] 3. Bind all of the GUI text fields to an array, OUTPUT to textfields - based on the Address.py file found in the PythonCard demo [SUCCESS] 4. Create (cut n paste) a database class for the Modifried[sic] Address.py, now CustomerDB.py [SUCCESS] 5. Bind it to the GUI; nextRec and prevRec (it works, but it can be better) [SUCCESS] 6. Create save, delete, update methods [SUCCESS] 7. General clean up [Close enough] Author: Scott Mallory eMail: sm@ebbyfish.com wSite: www.ebbyfish.com Date started: Nov. 21, 2004 Date finished: Dec. 10, 2004 (for the most part) ! This DB reqires at least ONE record to exist in both Tables. See the README! SQL Tips for newbies like me... http://www.techonthenet.com/sql/insert.htm The syntax for the SELECT statement is: SELECT columns FROM tables WHERE predicates; The syntax for the INSERT statement is: INSERT INTO table (column-1, column-2, ... column-n) VALUES (value-1, value-2, ... value-n); The syntax the UPDATE statement is: UPDATE table SET column = expression WHERE predicates; The syntax for the COUNT function is: SELECT COUNT (expression ) FROM tables WHERE predicates; The syntax for the DELETE statement is: DELETE FROM table WHERE predicates; """ from PythonCard import configuration, dialog, model, util import os, sys, re import sqlite import string import wx # child window grid thingy import dbtable # this should come from the GUI as a return value or we can build the db on the fly # if it doesn't already exist connection={'databasename':'cust_demo.db','directory':'E:\\_EbbyFish INC\\xx_PYTHON\\projects\\db'} ## The below vars are written for the database I created, it has some issues but I am new to ## sql so give me break. # Look at the last value in this list, it is an Area, writing this out in long form solved some problems... TEXT_FIELD_NAMES = ['Field_name','Field_agcomp','Field_divdept','Field_addy1','Field_addy2','Field_city','Field_state','Field_zip','Field_phone','Field_fax','Field_email','Field_machcode','Field_header1','Field_header2','Field_cardset','Field_dippack','Field_balance','Area_notes'] #modified text field name list -> added 2 fields at end BNAMES = ['name','agcomp','divdept','addy1','addy2','city','state','zip','phone','fax','email','machcode','cardset','dippack','balance','header1','header2'] # this is for Table one Table_Cust_Names = ['Field_name','Field_agcomp','Field_divdept','Field_addy1','Field_addy2','Field_city','Field_state','Field_zip','Field_phone','Field_fax','Field_email','Field_machcode','Field_cardset','Field_dippack','Field_balance'] # this is for Table two Table_Data_Names = ['Field_name','Field_header1','Field_header2','Area_notes'] ## This is for the Next/Prev Record lookup method, notice some of the fields are from another table SQLSCROLLSTRING = """select TCust.TCust_id, TCust.TCust_name, TCust.TCust_agcomp, TCust.TCust_divdept, TCust.TCust_addy1, TCust.TCust_addy2, TCust.TCust_city, TCust.TCust_state, TCust.TCust_zip, TCust.TCust_phone, TCust.TCust_fax, TCust.TCust_email, TCust.TCust_machcode, TData.TData_header1, TData.TData_header2, TCust.TCust_cardset, TCust.TCust_dippack, TCust.TCust_balance, TData.TData_notes from TCust, TData where TCust.TCust_id = TData.TData_id and TCust.TCust_id=""" ## A Dictionary for the update method, note the last two items dTableList = {'Field_name':'TCust_name', 'Field_agcomp':'TCust_agcomp', 'Field_divdept':'TCust_divdept', 'Field_addy1':'TCust_addy1', 'Field_addy2':'TCust_addy2', 'Field_city':'TCust_city', 'Field_state':'TCust_state', 'Field_zip':'TCust_zip', 'Field_phone':'TCust_phone', 'Field_fax':'TCust_fax', 'Field_email':'TCust_email', 'Field_machcode':'TCust_machcode', 'Field_cardset':'TCust_cardset', 'Field_dippack':'TCust_dippack', 'Field_balance':'TCust_balance', 'Field_header1':'TData_header1', 'Field_header2':'TData_header2', 'Area_notes':'TData_notes', 'file':'TData_file' } dbTable_TCust = """create table TCust( TCust_id integer primary key, TCust_name varchar(50), TCust_agcomp varchar(70), TCust_divdept varchar(70), TCust_addy1 varchar(60), TCust_addy2 varchar(60), TCust_city varchar(50), TCust_state varchar(20), TCust_zip varchar(20), TCust_phone varchar(25), TCust_fax varchar(25), TCust_email varchar(80), TCust_machcode varchar(50), TCust_cardset varchar(20), TCust_dippack varchar(2), TCust_balance varchar(5));""" dbTable_TData = """create table TData( TData_id integer primary key, TData_name varchar(50), TData_notes varchar(200), TData_header1 varchar(30), TData_header2 varchar(30), TData_fileBlob varchar(300))""" class Document: def __init__(self, view): self.view = view self.current = -1 self.setDropDown() self.records = [] def setDropDown(self): # Set the Combo box, the Grid and the id num to 1 tablenames = dbObj.getTables() print tablenames self.view.components['ComboBox1'].items = tablenames self.view.components['ComboBox1'].stringSelection = tablenames[0] col = dbObj.getCols(tablenames[0]) if col is not None: row = dbObj.getRows(tablenames[0]) self.view.tableWindow.setMyTable(col, row) r = dbObj.nextRec(0) self.buildBrowseInterface(r) def getDataArea(self): "Called from a text update method" # Get the text from the email (pasted in) # - my server formats the emails # Parse the data -> everything right of the '=' sign is good char # Populate the intended textFields # The 'Save Customer' button will handle the database method # # ## Get the data ## # strData = self.view.components['inputText'].text print strData # make a list from crap listData = strData.splitlines() try: listData.remove('------------------------------') except: pass # ## Parse the data ## # # Let's get everything right of the '=' sign, including NULLS # common variables # NOTE: the space in '. *' is for the NULL strings, which are permitted in my DB seekStr = r'(?<==)(?P. *)' InterfaceList = [] for s in listData: try: match_obj = re.search(seekStr, s, re.IGNORECASE| re.VERBOSE).group(1) InterfaceList.append(match_obj.strip()) except AttributeError: # not sure if this is even working, but it stopped breaking... InterfaceList.append(' ') self.buildInterface(InterfaceList) self.view.components['inputText'].text = "" def buildInterface(self, list): "Populate the Text Fields" # called only from getAreaData self.clearFields() # always start fresh print list i = 0 for wName in TEXT_FIELD_NAMES: try: self.view.components['Text%s' % wName].text = list[i]; i+=1 except: pass def clearFields(self): for wName in TEXT_FIELD_NAMES: self.view.components['Text%s' % wName].text = " " def buildBrowseInterface(self, record): # this method is for 'scrolling' through the db #print record i = 0 self.view.components['TextField_current'].text = str(record[0]) # print self.view.components['TextField_current'].text tmpList = list(record) tmpList.pop(0) recList = tmpList for wName in TEXT_FIELD_NAMES: self.view.components['Text%s' % wName].text = recList[i]; i+=1 def goNextRecord(self): "Next Record" index = self.view.components['TextField_current'].text r = dbObj.nextRec(index) self.buildBrowseInterface(r) def goPrevRecord(self): "Previous Record" index = self.view.components['TextField_current'].text r = dbObj.prevRec(index) self.buildBrowseInterface(r) def goFirstRecord(self): # Only called form Menu bar r = dbObj.firstRec() self.buildBrowseInterface(r) def goLastRecord(self): # Only called from Menu bar r = dbObj.lastRec() self.buildBrowseInterface(r) def findRecord(self, searchText, caseSensitive): # This approach will not work as my DB can have duplicates # It should return a list of ALL Matches in a grid, prehaps later that will be address pass def deleteRecord(self): recNumber = self.view.components['TextField_current'].text dbObj.removeExistingRecord(recNumber) self.view.components['TextField_current'].text = "" self.clearFields() #print 'Delete Record' def updateRecord(self): "Test & collect any 'dirty' fields => Update db" upDateTable_C_List = [] upDateValue_C_List = [] upDateTable_D_List = [] upDateValue_D_List = [] recNumber = self.view.components['TextField_current'].text for wName in Table_Cust_Names: dirty = self.view.components['Text%s' % wName].isModified() if dirty: upDateTable_C_List.append(wName) upDateValue_C_List.append(self.view.components['Text%s' % wName].text) for wName in Table_Data_Names: dirty = self.view.components['Text%s' % wName].isModified() if dirty: upDateTable_D_List.append(wName) upDateValue_D_List.append(self.view.components['Text%s' % wName].text) # call the update method, nothing is returned dbObj.updateExistingRecord(upDateTable_C_List, upDateValue_C_List, upDateTable_D_List, upDateValue_D_List, recNumber) def saveRecord(self): # First Table data = [] for wName in Table_Cust_Names: str = self.view.components['Text%s' % wName].text data.append(str) #print data #Second Table secData = [] for wName in Table_Data_Names: str = self.view.components['Text%s' % wName].text secData.append(str) secData.append('EMPTY') # a placeholder for a file object to be built later # Make SQL Strings SQLStr1 = '","'.join(["%s" % (k) for k in data]) SQLStr2 = '","'.join(["%s" % (v) for v in secData]) dbObj.addNewRecord(SQLStr1, SQLStr2) self.clearFields() def printRecord(self): # we will print to screen then wite a PDF class later print 'Print Record' def createDataFile(self): # This will write out a 'data file' then call another app # to encrypt the file print 'Create master file' def setTable(self, table): row = dbObj.getRow(table) col = dbObj.getCols(table) #print row #print col tab = [col, row] return tab # Class browse is copied from pysqliteBrowse.py (cut n paste) all of my # 'attempts' at hacking are denoted with a SM - 11/30/2004 # Apologies to Andy Todd... # KEA 2003-01-19 # see http://www.hwaci.com/sw/sqlite/faq.html class browse: # Connection should be a dictionary with at least two keys, # 'databasename' and 'directory' # This is wildly different to other database modules def __init__(self, connection): "Setup the database connection" self._system_tables = [] # Not providing a db name is guaranteed to ruin our connection if not connection['databasename']: raise ValueError filename = os.path.join(connection['directory'], connection['databasename']) self._db = sqlite.connect(filename, autocommit=1) # Is autocommit bad form? self._cursor = self._db.cursor() # SM # This is for scrolling thru the records self._cu_scrollPos = self._db.cursor() self._cu_scrollPos.execute('select * from TCust') # SM end # This one is used in getRow self._tableName = '' # Begin SM 11/30/2004 def addNewRecord(self, SQLStr_1, SQLStr_2): """Adds a record to the database. Test for an existing name AND agency name (so we don't get duplicates). Then add the new record. """ ### CREATE UNIQUE INDEX IDX_NAME ON TCust(TCust_name,TCust_agcomp) ### my db can have the same name many times as well as agcomp (Agency) ### Stan Pick , Boulder PD, a OK ### Stan Pick , Boudler COMM, s OK ### Jonny Smooth, Boulder COMM, s OK ### Stan Pick, Boulder PD, z NOT OK -> already exist, a dialog? -> Should this be an update event? Y N ? ## First Table cols_1 = self.getCols("TCust") cols_1.remove('TCust_id') # this db autoincrements so i don't need to worry about this col colNames_1 = ",".join(tuple(cols_1)) stmt = "insert into TCust (" + colNames_1 + ") values (\"" + SQLStr_1 + "\");" ## Second Table cols_2 = self.getCols("TData") cols_2.remove('TData_id') colNames_2 = ",".join(tuple(cols_2)) stmt_2 = "insert into TData (" + colNames_2 + ") values (\"" + SQLStr_2 + "\");" try: self._cursor.execute(stmt) except: # replace with message window print "This record already exist" return 0 print stmt_2 # this should only fire off if the 'try:' passed self._cursor.execute(stmt_2) def updateExistingRecord(self, listOfTables_C, listOfValues_C,listOfTables_D, listOfValues_D, id): ### Can only update one table at a time ### maybe a cascade event? # Parse out table names from dictionary then builds sql query string for each table to be updated rstr = re.compile("\ATD.*") # looking for the 'TD' in TData tcust, tdata = False, False colStr_1, colStr_2 = [], [] colNames_1 = map(lambda k: dTableList.get(k), listOfTables_C) colNames_2 = map(lambda v: dTableList.get(v), listOfTables_D) # I'm sure there is a better way to do this... SM for t in colNames_1: m = re.match("^TC", t) if m: tcust = True; break for r in colNames_2: m = re.match("^TD", r) if m: tdata = True; break ## TCust Table if tcust: # Build the string for t,v in zip(colNames_1, listOfValues_C): colStr_1.append("%s='%s'" % ( t, v )) c1 = ",".join(["%s" % k for k in colStr_1]) # sql stmt = 'update TCust' stmt = stmt + ' set %s' % c1 stmt = stmt + ' where TCust_id=\'%s\'' % id print stmt self._cursor.execute(stmt) ## TData Table if tdata: # Build the string for t,v in zip(colNames_2, listOfValues_D): colStr_2.append("%s='%s'" % ( t, v )) c2 = ",".join(["%s" % k for k in colStr_2]) # sql stmt = 'update TData' stmt = stmt + ' set %s' % c2 stmt = stmt + ' where TData_id=\'%s\'' % id print stmt self._cursor.execute(stmt) def removeExistingRecord(self, recNumber): ## rewrite so data base support Cascade Delete (if SQlite supports this ( a trigger statement?)) self._cursor.execute('delete from TCust where TCust_id=%s' % recNumber) self._cursor.execute('delete from TData where TData_id=%s' % recNumber) # reset the scroll cursor self._cu_scrollPos.execute('select * from TCust') def undoLastOp(self): #rollback pass ## thsi fails if there are no records...must find better method def getCols(self, tableName): "Return only the column names, nothing else." stmt = "select * from " + tableName self._cursor.execute(stmt) try: row = self._cursor.fetchone() return row.keys() except: pass ## The following two functions beg for a recursive method, but ## I'm not sure how to write it. But! would be a good idea just because it can be done? ## Methinks they can be the same function as well... def nextRec(self, index): ### A very ugly (READ => verbose) method in which you can get a scads of ### data from 2 or more tables. I am sure there is a better way, but ### a this time I do not know it. SM 12-6-04 ### index = int(index) self._cursor.execute('select min(ROWID) from TCust') firstrow = self._cursor.fetchone() bof = firstrow[0] self._cursor.execute('select max(ROWID) from TCust') lastrow = self._cursor.fetchone() eof = lastrow[0] index += 1 self._cursor.execute('%s%d' % (SQLSCROLLSTRING, index)) rec = self._cursor.fetchone() print "\nIndex = ", index print rec while rec is None: index += 1 if index > eof: index = bof self._cursor.execute('%s%d' % (SQLSCROLLSTRING, index)) rec = self._cursor.fetchone() return rec def prevRec(self, index): index = int(index) self._cursor.execute('select min(ROWID) from TCust') firstrow = self._cursor.fetchone() bof = firstrow[0] self._cursor.execute('select max(ROWID) from TCust') lastrow = self._cursor.fetchone() eof = lastrow[0] index -= 1 self._cursor.execute('%s%d' % (SQLSCROLLSTRING, index)) rec = self._cursor.fetchone() while rec is None: index -= 1 if index < bof: index = eof self._cursor.execute('%s%d' % (SQLSCROLLSTRING, index)) rec = self._cursor.fetchone() return rec def firstRec(self): self._cursor.execute('select min(ROWID) from TCust') firstRow = self._cursor.fetchone() return firstRow def lastRec(self): self._cursor.execute('select max(ROWID) from TCust') lastRow = self._cursor.fetchone() return lastRow # End SM 11/30/2004 def getTables(self): "Return a list of all of the non-system tables in " stmt = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;" self._cursor.execute(stmt) # I'm using a list comprehension here instead of a for loop, # either will do but I think this is more concise (unlike this comment) return [ x[0] for x in self._cursor.fetchall() if x[0] not in self._system_tables ] def getColumns(self, tableName): "Get the definition of the columns in tableName" stmt = "select * from " + tableName self._cursor.execute(stmt) row = self._cursor.fetchone() columnDefs = [] # SM - I may be wrong, but doesn't _cursor.description do the same thing as this loop? for column in row.keys(): columnName = column dataType, nullable, key, default = "varchar", "", "", "" # Dodgy default, but if works for me precision = 255 columnDefs.append((columnName, dataType, precision, nullable, key, default)) return columnDefs def getQueryString(self, tableName): "Return a SQL statement which queries all of the columns in tableName" tableStructure = self.getColumns(tableName) # Construct and return the string stmt='SELECT ' for columnList in tableStructure: stmt+=columnList[0]+', ' stmt=stmt[:-2]+' FROM '+tableName print stmt return stmt def getRow(self, tableName): "Get a row from tableName" # When we upgrade to 2.2 this will be a great candidate for a # generator/iterator. In the meantime we use self._tableName to keep # track of what we are doing # SM -- I'm not sure what KEA is doing, so I am # simplifing it...In my case I will always know what table I am using ##if tableName!=self._tableName: ## self._tableName=tableName self._cursor.execute( "select * from " + tableName ) #(self.getQueryString(tableName)) try: result = self._cursor.fetchone() except: print "KEA argh!" result=[] return result def getRows(self, tableName): "Get all of the rows from tableName" ##if tableName!=self._tableName: ## self._tableName=tableName self._cursor.execute( "select * from " + tableName ) #(self.getQueryString(tableName)) try: result = self._cursor.fetchall() except: result = [] return result class Addresses(model.Background): def on_initialize(self, event): # SM - the grid window self.tableWindow = model.childWindow(self, dbtable.DBTable) self.tableWindow.position = (200, 5) self.tableWindow.visible = False # End SM self.document = Document(view=self) def on_exit_command(self, event): self.close() def on_goPrev_command(self, event): self.document.goPrevRecord() def on_goNext_command(self, event): self.document.goNextRecord() def on_goFirst_command(self, event): self.document.goFirstRecord() def on_goLast_command(self, event): self.document.goLastRecord() def on_findRecord_command(self, event): result = dialog.findDialog(self) if result.accepted: self.document.findRecord(result.searchText, result.caseSensitive) def on_editUndo_command(self, event): widget = self.findFocus() if hasattr(widget, 'editable') and widget.canUndo(): widget.undo() def on_editRedo_command(self, event): widget = self.findFocus() if hasattr(widget, 'editable') and widget.canRedo(): widget.redo() def on_editCut_command(self, event): widget = self.findFocus() if hasattr(widget, 'editable') and widget.canCut(): widget.cut() def on_editCopy_command(self, event): widget = self.findFocus() if hasattr(widget, 'editable') and widget.canCopy(): widget.copy() def on_editPaste_command(self, event): widget = self.findFocus() if hasattr(widget, 'editable') and widget.canPaste(): widget.paste() def on_editClear_command(self, event): widget = self.findFocus() if hasattr(widget, 'editable'): if widget.canCut(): # delete the current selection, # if we can't do a Cut we shouldn't be able to delete either # which is why i used the test above sel = widget.replaceSelection('') else: ins = widget.getInsertionPoint() try: widget.replace(ins, ins + 1, '') except: pass def on_editSelectAll_command(self, event): widget = self.findFocus() if hasattr(widget, 'editable'): widget.setSelection(0, widget.getLastPosition()) def on_editSaveRecord_command(self, event): self.document.saveRecord() def on_editDeleteRecord_command(self, event): "Deletes current record" result = self.on_buttonMessage_mouseClick(None) if result == 1: self.document.deleteRecord() def on_editUpdateRecord_command(self, event): 'Update the fields that have been "dirtied"' # "Dirty boy, dirty-dirty-dirty!" - Morgan Proctor self.document.updateRecord() def on_editPrintRecord_command(self, event): self.document.printRecord() def on_editCreateMFile_command(self, event): "A method that creates a license file (text), particluar to my software." self.document.createDataFile() def on_showTable_command(self, event): target = event.target if target.label == 'Show Table': self.tableWindow.visible = True target.label = 'Hide Table' else: self.tableWindow.visible = False target.label = 'Show Table' def on_inputText_textUpdate(self, event): self.document.getDataArea() def on_ComboBox1_textUpdate(self, event): col = dbObj.getCols(event.target.text) row = dbObj.getRows(event.target.text) # see the dbtable.py file for the code on parsing these return values self.tableWindow.setMyTable(col, row) def on_buttonMessage_mouseClick(self, event): """ result = dialog.messageDialog(self, 'a message', 'a title', wx.ICON_ERROR | wx.YES_NO) """ print "Howdy" result = dialog.messageDialog(self, 'Are you sure you want to delete this Record?', 'Delete Customert Record', wx.ICON_EXCLAMATION | wx.YES_NO | wx.NO_DEFAULT | wx.CANCEL) return result.accepted if __name__ == '__main__': dbObj = browse(connection) ## Calling the database form here makes it visable to all Classes app = model.Application(Addresses) app.MainLoop()