3 import sys, getopt, os, smtplib, commands, time, gtk, gtk.glade
5 protocols = ('POP3', 'APOP', 'IMAP',)
8 temp = "/usr/tmp/torturestest-%d" % os.getpid()
10 def __init__(self, line=None):
11 "Initialize site data from the external representation."
19 self.capabilities = ""
23 (self.host, self.mailaddr, self.username, self.password, \
24 self.protocol, self.ssl, self.options, self.capabilities, \
25 self.recognition, self.comment) = \
26 line.strip().split(":")
28 self.mailaddr = self.username
34 "Return a tuple consisting of all this site's attributes."
35 return (self.host, self.mailaddr, self.username, self.password, \
36 self.protocol, self.ssl, self.options, self.capabilities, \
37 self.recognition, self.comment)
40 "Return the external representation of this site's data."
41 return ":".join(self.allattrs())
43 def prettyprint(self):
44 "Prettyprint a site entry in human-readable form."
52 "Capabilities: %s\n" \
58 "Print a .fetchmailrc entry corresponding to a site entry."
59 rep = "poll %s-%s via %s with proto %s %s\n" \
60 " user %s there with password '%s' is esr here" \
61 % (self.host,self.protocol,self.host,self.protocol,self.options,self.username,self.password)
62 if self.ssl and self.ssl != 'False':
68 "Print an HTML server-type table entry."
69 return "<tr><td>%s: %s</td><td>%s</td>\n" \
70 % (self.protocol, self.comment, self.capabilities)
74 rep = "%s %s at %s" % (self.protocol, self.recognition, self.host)
76 rep += " (" + self.capabilities + ")"
78 rep += " using " + self.options
81 def testmail(self, n=None):
82 "Send test mail to the site."
83 server = smtplib.SMTP("localhost")
84 fromaddr = "esr@thyrsus.com"
85 if self.mailaddr.find("@") > -1:
86 toaddr = self.mailaddr
88 toaddr = "%s@%s" % (self.mailaddr, self.host)
89 msg = ("From: %s\r\nTo: %s\r\n\r\n" % (fromaddr, toaddr))
92 msg += "Test mail collected from %s.\n" % (self.id(),)
93 server.sendmail(fromaddr, toaddr, msg)
96 def fetch(self, logfile=False):
97 "Run a mail fetch on this site."
99 ofp = open(TestSite.temp, "w")
101 mda = "(echo; echo \'From torturetest\' `date`;cat) >>TEST.LOG"
104 ofp.write('defaults mda "%s"\n' % mda)
105 ofp.write(self.entryprint())
107 (self.status, self.output) = commands.getstatusoutput("fetchmail -d0 -v -f - <%s"%TestSite.temp)
109 os.system("cat " + TestSite.temp)
111 os.remove(TestSite.temp)
114 "Did we have a test failure here?"
115 return os.WIFEXITED(self.status) and os.WEXITSTATUS(self.status) > 1
118 "Explain the status of the last test."
119 if not os.WIFEXITED(self.status):
120 return self.id() + ": abnormal termination\n"
121 elif os.WEXITSTATUS(self.status) > 1:
122 return self.id() + ": %d\n" % os.WEXITSTATUS(self.status) + self.output
124 return self.id() + ": succeeded\n"
127 "Torturetest editing GUI,"
129 # All site parameters except protocol
130 field_map = ('host', 'mailaddr', 'username', 'password', \
131 'options', 'capabilities', 'recognition', 'comment')
134 # Build the widget tree from the glade XML file.
135 self.wtree = gtk.glade.XML("torturetest.glade")
136 # File in initial values
137 self.combo = self.wtree.get_widget("combo1")
138 self.combo.set_popdown_strings(map(lambda x: x.comment, sitelist))
139 self.sslcheck = self.wtree.get_widget("ssl_checkbox")
140 self.site = sitelist[0]
141 self.display(self.site)
143 # Provide handlers for the widget tree's events
145 for key in ('on_torturetest_destroy',
146 'on_updatebutton_clicked',
147 'on_newbutton_clicked',
148 'on_testbutton_clicked',
149 'on_quitbutton_clicked',
150 'on_dumpbutton_clicked',
151 'on_combo_entry1_activate'):
152 mydict[key] = getattr(self, key)
153 self.wtree.signal_autoconnect(mydict)
158 def get_widget(self, widget):
159 "Get the value of a widget's contents."
160 if type(widget) == type(""):
161 widget = self.wtree.get_widget(widget)
162 if type(widget) == gtk.Entry:
163 return widget.get_text()
164 #elif type(widget) == gtk.SpinButton:
165 # return widget.get_value()
166 #elif type(widget) == gtk.TextView:
167 # return widget.get_buffer().get_text()
169 def set_widget(self, name, exp):
170 "Set the value of a widget by name."
171 widget = self.wtree.get_widget(name)
172 if type(widget) == gtk.Entry:
174 elif type(widget) == gtk.SpinButton:
175 widget.set_value(exp)
176 elif type(widget) == gtk.TextView:
177 if not widget.get_buffer():
178 widget.set_buffer(gtk.TextBuffer())
179 widget.get_buffer().set_text(exp)
181 def display(self, site):
182 for member in TortureGUI.field_map:
183 self.set_widget(member + "_entry", getattr(site, member))
184 for proto in protocols:
185 self.wtree.get_widget(proto + "_radiobutton").set_active(site.protocol == proto)
186 self.sslcheck.set_active(int(site.ssl != '' and site.ssl != 'False'))
187 self.combo.entry.set_text(site.comment)
189 def update(self, site):
190 for member in TortureGUI.field_map:
191 setattr(site, member, self.get_widget(member + "_entry"))
192 for proto in protocols:
193 if self.wtree.get_widget(proto + "_radiobutton").get_active():
194 site.protocol = proto
195 if self.wtree.get_widget("ssl_checkbox").get_active():
201 def on_torturetest_destroy(self, obj):
203 def on_updatebutton_clicked(self, obj):
204 self.update(self.site)
206 if self.site.comment:
207 self.combo.entry.set_text(self.site.comment)
209 self.combo.entry.set_text(self.site.host)
210 def on_newbutton_clicked(self, obj):
212 sitelist = [TestSite()] + sitelist
213 self.site = sitelist[0]
214 self.display(self.site)
215 self.combo.entry.set_text("")
216 def on_testbutton_clicked(self, obj):
217 self.site.fetch(False)
218 print self.site.output
219 def on_quitbutton_clicked(self, obj):
221 def on_dumpbutton_clicked(self, obj):
224 def on_combo_entry1_activate(self, obj):
225 key = self.combo.entry.get_text()
226 for site in sitelist:
227 if site.comment.find(key) > -1:
229 self.display(self.site)
232 if __name__ == "__main__":
233 # Start by reading in the sitelist
234 ifp = open("testsites")
239 line = ifp.readline()
242 elif line[0] in ("#", "\n"):
246 sitelist.append(TestSite(line))
248 print "Error on line %d" % linect
251 (options, arguments) = getopt.getopt(sys.argv[1:], "dfp:tigvse")
253 for (switch, value) in options:
255 # Prettprint the sitelist
256 map(lambda x: sys.stdout.write(x.prettyprint() + "%%\n"), sitelist)
259 # Dump the sitelist as a .fetchmailrc file
260 map(lambda x: sys.stdout.write(x.entryprint()), sitelist)
263 # Probe a single site
265 for site in sitelist:
266 if `site`.find(value) > -1:
267 selected.append(site)
271 # Dump the sitelist in HTML table form
272 map(lambda x: sys.stdout.write(x.tableprint()), sitelist)
275 # Dump the ids of the sitelist
276 map(lambda x: sys.stdout.write(x.id() + "\n"), sitelist)
280 for site in sitelist:
281 print "Sending test mail to " + site.id()
284 # Send test mail to each site
285 sys.stdout.write("Delaying to give the test mail time to land...")
287 sys.stdout.write("here we go:\n")
290 # Display the test output
293 # Dump recognition strings of all tested servers as a Python tuple
294 print "(" + ",\n".join(map(lambda x: repr(x.recognition), filter(lambda x: x.recognition, sitelist))) + ")"
300 # If no options, run the torture test
302 failures = successes = 0
303 os.system("fetchmail -q")
304 for site in sitelist:
305 print "Testing " + site.id()
314 # OK, summarize results
315 print "\n%d successes and %d failures out of %d tests" \
316 % (successes, failures, len(sitelist))
319 print "Bad status was returned on the following sites:"
320 for site in sitelist:
322 sys.stdout.write(site.explain() + "\n")
323 except KeyboardInterrupt: