3 import sys, getopt, os, smtplib, commands, time, gtk, gtk.glade
4 os.environ['LC_ALL'] = 'C'
7 # Mail address to use in From: header, user@host.domain format
8 fromaddr = "ma+fetchmail@dt.e-technik.uni-dortmund.de"
9 # local user name to receive mail as
11 # server to inject mail into
12 smtpserver = "localhost"
13 # delay after sending mail
15 ### END OF REQUIRED CUSTOMIZATION
17 # only used for the GUI:
18 protocols = ('POP3', 'APOP', 'IMAP',)
21 temp = "/usr/tmp/torturestest-%d" % os.getpid()
23 def __init__(self, line=None):
24 "Initialize site data from the external representation."
32 self.capabilities = ""
36 (self.host, self.mailaddr, self.username, self.password, \
37 self.protocol, self.ssl, self.options, self.capabilities, \
38 self.recognition, self.comment) = \
39 line.strip().split(":")
41 self.mailaddr = self.username
47 "Return a tuple consisting of all this site's attributes."
48 return (self.host, self.mailaddr, self.username, self.password, \
49 self.protocol, self.ssl, self.options, self.capabilities, \
50 self.recognition, self.comment)
53 "Return the external representation of this site's data."
54 return ":".join(self.allattrs())
56 def prettyprint(self):
57 "Prettyprint a site entry in human-readable form."
65 "Capabilities: %s\n" \
71 "Print a .fetchmailrc entry corresponding to a site entry."
72 rep = "poll %s-%s via %s with proto %s %s\n" \
73 " user %s there with password '%s' is %s here" \
74 % (self.host,self.protocol,self.host,self.protocol,self.options,self.username,self.password,localuser)
75 if self.ssl and self.ssl != 'False':
81 "Print an HTML server-type table entry."
82 return "<tr><td>%s: %s</td><td>%s</td>\n" \
83 % (self.protocol, self.comment, self.capabilities)
87 rep = "%s %s at %s" % (self.protocol, self.recognition, self.host)
89 rep += " (" + self.capabilities + ")"
91 rep += " using " + self.options
94 def testmail(self, n=None):
95 "Send test mail to the site."
96 server = smtplib.SMTP(smtpserver)
97 if self.mailaddr.find("@") > -1:
98 toaddr = self.mailaddr
100 toaddr = "%s@%s" % (self.mailaddr, self.host)
101 msg = ("From: %s\r\nTo: %s\r\n\r\n" % (fromaddr, toaddr))
104 msg += "Test mail collected from %s.\n" % (self.id(),)
105 server.sendmail(fromaddr, toaddr, msg)
108 def fetch(self, logfile=False):
109 "Run a mail fetch on this site."
111 ofp = open(TestSite.temp, "w")
113 mda = "(echo \'From torturetest\' `date`;cat -;echo) >>TEST.LOG"
116 ofp.write('defaults mda "%s"\n' % mda)
117 ofp.write(self.entryprint())
119 (self.status, self.output) = commands.getstatusoutput("fetchmail -d0 -v -f - <%s"%TestSite.temp)
121 os.system("cat " + TestSite.temp)
123 os.remove(TestSite.temp)
126 "Did we have a test failure here?"
127 return os.WIFEXITED(self.status) and os.WEXITSTATUS(self.status) > 1
130 "Explain the status of the last test."
131 if not os.WIFEXITED(self.status):
132 return self.id() + ": abnormal termination\n"
133 elif os.WEXITSTATUS(self.status) > 1:
134 return self.id() + ": %d\n" % os.WEXITSTATUS(self.status) + self.output
136 return self.id() + ": succeeded\n"
139 "Torturetest editing GUI,"
141 # All site parameters except protocol
142 field_map = ('host', 'mailaddr', 'username', 'password', \
143 'options', 'capabilities', 'recognition', 'comment')
146 # Build the widget tree from the glade XML file.
147 self.wtree = gtk.glade.XML("torturetest.glade")
148 # File in initial values
149 self.combo = self.wtree.get_widget("combo1")
150 self.combo.set_popdown_strings(map(lambda x: x.comment, sitelist))
151 self.sslcheck = self.wtree.get_widget("ssl_checkbox")
152 self.site = sitelist[0]
153 self.display(self.site)
155 # Provide handlers for the widget tree's events
157 for key in ('on_torturetest_destroy',
158 'on_updatebutton_clicked',
159 'on_newbutton_clicked',
160 'on_testbutton_clicked',
161 'on_quitbutton_clicked',
162 'on_dumpbutton_clicked',
163 'on_combo_entry1_activate'):
164 mydict[key] = getattr(self, key)
165 self.wtree.signal_autoconnect(mydict)
170 def get_widget(self, widget):
171 "Get the value of a widget's contents."
172 if type(widget) == type(""):
173 widget = self.wtree.get_widget(widget)
174 if type(widget) == gtk.Entry:
175 return widget.get_text()
176 #elif type(widget) == gtk.SpinButton:
177 # return widget.get_value()
178 #elif type(widget) == gtk.TextView:
179 # return widget.get_buffer().get_text()
181 def set_widget(self, name, exp):
182 "Set the value of a widget by name."
183 widget = self.wtree.get_widget(name)
184 if type(widget) == gtk.Entry:
186 elif type(widget) == gtk.SpinButton:
187 widget.set_value(exp)
188 elif type(widget) == gtk.TextView:
189 if not widget.get_buffer():
190 widget.set_buffer(gtk.TextBuffer())
191 widget.get_buffer().set_text(exp)
193 def display(self, site):
194 for member in TortureGUI.field_map:
195 self.set_widget(member + "_entry", getattr(site, member))
196 for proto in protocols:
197 self.wtree.get_widget(proto + "_radiobutton").set_active(site.protocol == proto)
198 self.sslcheck.set_active(int(site.ssl != '' and site.ssl != 'False'))
199 self.combo.entry.set_text(site.comment)
201 def update(self, site):
202 for member in TortureGUI.field_map:
203 setattr(site, member, self.get_widget(member + "_entry"))
204 for proto in protocols:
205 if self.wtree.get_widget(proto + "_radiobutton").get_active():
206 site.protocol = proto
207 if self.wtree.get_widget("ssl_checkbox").get_active():
213 def on_torturetest_destroy(self, obj):
215 def on_updatebutton_clicked(self, obj):
216 self.update(self.site)
218 if self.site.comment:
219 self.combo.entry.set_text(self.site.comment)
221 self.combo.entry.set_text(self.site.host)
222 def on_newbutton_clicked(self, obj):
224 sitelist = [TestSite()] + sitelist
225 self.site = sitelist[0]
226 self.display(self.site)
227 self.combo.entry.set_text("")
228 def on_testbutton_clicked(self, obj):
229 self.site.fetch(False)
230 print self.site.output
231 def on_quitbutton_clicked(self, obj):
233 def on_dumpbutton_clicked(self, obj):
236 def on_combo_entry1_activate(self, obj):
237 key = self.combo.entry.get_text()
238 for site in sitelist:
239 if site.comment.find(key) > -1:
241 self.display(self.site)
244 if __name__ == "__main__":
245 # Start by reading in the sitelist
246 ifp = open("testsites")
251 line = ifp.readline()
254 elif line[0] in ("#", "\n"):
258 sitelist.append(TestSite(line))
260 print "Error on line %d" % linect
263 (options, arguments) = getopt.getopt(sys.argv[1:], "dfp:tigvse")
265 for (switch, value) in options:
267 # Prettprint the sitelist
268 map(lambda x: sys.stdout.write(x.prettyprint() + "%%\n"), sitelist)
271 # Dump the sitelist as a .fetchmailrc file
272 map(lambda x: sys.stdout.write(x.entryprint()), sitelist)
275 # Probe a single site
277 for site in sitelist:
278 if `site`.find(value) > -1:
279 selected.append(site)
283 # Dump the sitelist in HTML table form
284 map(lambda x: sys.stdout.write(x.tableprint()), sitelist)
287 # Dump the ids of the sitelist
288 map(lambda x: sys.stdout.write(x.id() + "\n"), sitelist)
292 for site in sitelist:
293 print "Sending test mail to " + site.id()
296 # Send test mail to each site
297 print "Delaying to give the test mail time to land..."
302 # Display the test output
305 # Dump recognition strings of all tested servers as a Python tuple
306 print "(" + ",\n".join(map(lambda x: repr(x.recognition), filter(lambda x: x.recognition, sitelist))) + ")"
312 # If no options, run the torture test
314 failures = successes = 0
315 os.system("fetchmail -q")
316 for site in sitelist:
317 print "Testing " + site.id()
326 # OK, summarize results
327 print "\n%d successes and %d failures out of %d tests" \
328 % (successes, failures, len(sitelist))
331 print "Bad status was returned on the following sites:"
332 for site in sitelist:
334 sys.stdout.write(site.explain() + "\n")
335 except KeyboardInterrupt: