]> Pileus Git - ~andy/fetchmail/blob - torturetest.py
First round ofmlong-delayed bug fixes.
[~andy/fetchmail] / torturetest.py
1 #!/usr/bin/env python2
2
3 import sys, getopt, os, smtplib, commands, time, gtk, gtk.glade
4
5 protocols = ('POP3', 'APOP', 'IMAP',)
6
7 class TestSite:
8     temp = "/usr/tmp/torturestest-%d" % os.getpid()
9
10     def __init__(self, line=None):
11         "Initialize site data from the external representation."
12         self.host = ""
13         self.mailaddr = ""
14         self.username = ""
15         self.password = ""
16         self.protocol = ""
17         self.ssl = ""
18         self.options = ""
19         self.capabilities = ""
20         self.recognition = ""
21         self.comment = ""
22         if line:
23             (self.host, self.mailaddr, self.username, self.password, \
24              self.protocol, self.ssl, self.options, self.capabilities, \
25              self.recognition, self.comment) = \
26                 line.strip().split(":")
27         if not self.mailaddr:
28             self.mailaddr = self.username
29         # Test results
30         self.status = None
31         self.output = None
32
33     def allattrs(self):
34         "Return a tuple consisting of all this site's attributes."
35         return (self.host, self.mailaddr, self.username, self.password, \
36                 self.protocol, self.ssl, self.options, self.capabilities, \
37                 self.recognition, self.comment)
38
39     def __repr__(self):
40         "Return the external representation of this site's data."
41         return ":".join(self.allattrs())
42
43     def prettyprint(self):
44         "Prettyprint a site entry in human-readable form."
45         return "Host: %s\n" \
46               "Mail To: %s\n" \
47               "Username: %s\n" \
48               "Password: %s\n" \
49               "Protocol: %s\n" \
50               "SSL: %s\n" \
51               "Options: %s\n" \
52               "Capabilities: %s\n" \
53               "Recognition: %s\n" \
54               "Comment: %s\n" \
55               % self.allattrs()
56
57     def entryprint(self):
58         "Print a .fetchmailrc entry corresponding to a site entry."
59         rep = "poll %s-%s via %s with proto %s %s\n" \
60                "   user %s there with password '%s' is esr here" \
61                % (self.host,self.protocol,self.host,self.protocol,self.options,self.username,self.password)
62         if self.ssl and self.ssl != 'False':
63             rep += " options ssl"
64         rep += "\n\n"
65         return rep
66
67     def tableprint(self):
68         "Print an HTML server-type table entry."
69         return "<tr><td>%s: %s</td><td>%s</td>\n" \
70                % (self.protocol, self.comment, self.capabilities)
71
72     def id(self):
73         "Identify this site."
74         rep = "%s %s at %s" % (self.protocol, self.recognition, self.host)
75         if self.capabilities:
76             rep += " (" + self.capabilities + ")"
77         if self.options:
78             rep += " using " + self.options
79         return rep
80
81     def testmail(self, n=None):
82         "Send test mail to the site."
83         server = smtplib.SMTP("localhost")
84         fromaddr = "esr@thyrsus.com"
85         if self.mailaddr.find("@") > -1:
86             toaddr = self.mailaddr
87         else:
88             toaddr = "%s@%s" % (self.mailaddr, self.host)
89         msg = ("From: %s\r\nTo: %s\r\n\r\n" % (fromaddr, toaddr))
90         if n != None:
91             msg += `n` + ": "
92         msg += "Test mail collected from %s.\n" % (self.id(),)
93         server.sendmail(fromaddr, toaddr, msg)
94         server.quit()
95
96     def fetch(self, logfile=False):
97         "Run a mail fetch on this site."
98         try:
99             ofp = open(TestSite.temp, "w")
100             if logfile:
101                 mda = "(echo; echo \'From torturetest\' `date`;cat) >>TEST.LOG"
102             else:
103                 mda = 'cat'
104             ofp.write('defaults mda "%s"\n' % mda)
105             ofp.write(self.entryprint())
106             ofp.close()
107             (self.status, self.output) = commands.getstatusoutput("fetchmail -d0 -v -f - <%s"%TestSite.temp)
108             if self.status:
109                 os.system("cat " + TestSite.temp)
110         finally:
111             os.remove(TestSite.temp)
112
113     def failed(self):
114         "Did we have a test failure here?"
115         return os.WIFEXITED(self.status) and os.WEXITSTATUS(self.status) > 1
116
117     def explain(self):
118         "Explain the status of the last test."
119         if not os.WIFEXITED(self.status):
120             return self.id() + ": abnormal termination\n"
121         elif os.WEXITSTATUS(self.status) > 1:
122             return self.id() + ": %d\n" % os.WEXITSTATUS(self.status) + self.output
123         else:
124             return self.id() + ": succeeded\n"
125
126 class TortureGUI:
127     "Torturetest editing GUI,"
128
129     # All site parameters except protocol
130     field_map = ('host', 'mailaddr', 'username', 'password', \
131                 'options', 'capabilities', 'recognition', 'comment')
132
133     def __init__(self):
134         # Build the widget tree from the glade XML file.
135         self.wtree = gtk.glade.XML("torturetest.glade")
136         # File in initial values
137         self.combo = self.wtree.get_widget("combo1")
138         self.combo.set_popdown_strings(map(lambda x: x.comment, sitelist))
139         self.sslcheck = self.wtree.get_widget("ssl_checkbox")
140         self.site = sitelist[0]
141         self.display(self.site)
142
143         # Provide handlers for the widget tree's events
144         mydict = {}
145         for key in ('on_torturetest_destroy',
146                     'on_updatebutton_clicked',
147                     'on_newbutton_clicked',
148                     'on_testbutton_clicked',
149                     'on_quitbutton_clicked',
150                     'on_dumpbutton_clicked',
151                     'on_combo_entry1_activate'):
152             mydict[key] = getattr(self, key)
153         self.wtree.signal_autoconnect(mydict)
154
155         gtk.mainloop()
156         print `self.site`
157
158     def get_widget(self, widget):
159         "Get the value of a widget's contents."
160         if type(widget) == type(""):
161             widget = self.wtree.get_widget(widget)
162         if type(widget) == gtk.Entry:
163             return widget.get_text()
164         #elif type(widget) == gtk.SpinButton:
165         #    return widget.get_value()
166         #elif type(widget) == gtk.TextView:
167         #    return widget.get_buffer().get_text()
168
169     def set_widget(self, name, exp):
170         "Set the value of a widget by name."
171         widget = self.wtree.get_widget(name)
172         if type(widget) == gtk.Entry:
173             widget.set_text(exp)
174         elif type(widget) == gtk.SpinButton:
175             widget.set_value(exp)
176         elif type(widget) == gtk.TextView:
177             if not widget.get_buffer():
178                 widget.set_buffer(gtk.TextBuffer())
179             widget.get_buffer().set_text(exp)
180
181     def display(self, site):
182         for member in TortureGUI.field_map:
183             self.set_widget(member + "_entry", getattr(site, member))
184         for proto in protocols:
185             self.wtree.get_widget(proto + "_radiobutton").set_active(site.protocol == proto)
186         self.sslcheck.set_active(int(site.ssl != '' and site.ssl != 'False'))
187         self.combo.entry.set_text(site.comment)
188
189     def update(self, site):
190         for member in TortureGUI.field_map:
191             setattr(site, member, self.get_widget(member + "_entry"))
192         for proto in protocols:
193             if self.wtree.get_widget(proto + "_radiobutton").get_active():
194                 site.protocol = proto
195         if self.wtree.get_widget("ssl_checkbox").get_active():
196             site.ssl = "True"
197         else:
198             site.ssl = "False"
199
200     # Housekeeping
201     def on_torturetest_destroy(self, obj):
202         gtk.mainquit()
203     def on_updatebutton_clicked(self, obj):
204         self.update(self.site)
205         print self.site
206         if self.site.comment:
207             self.combo.entry.set_text(self.site.comment)
208         else:
209             self.combo.entry.set_text(self.site.host)
210     def on_newbutton_clicked(self, obj):
211         global sitelist
212         sitelist = [TestSite()] + sitelist
213         self.site = sitelist[0]
214         self.display(self.site)
215         self.combo.entry.set_text("")
216     def on_testbutton_clicked(self, obj):
217         self.site.fetch(False)
218         print self.site.output
219     def on_quitbutton_clicked(self, obj):
220         gtk.mainquit()
221     def on_dumpbutton_clicked(self, obj):
222         print `self.site`
223
224     def on_combo_entry1_activate(self, obj):
225         key = self.combo.entry.get_text()
226         for site in sitelist:
227             if site.comment.find(key) > -1:
228                 self.site = site
229                 self.display(self.site)
230                 break
231
232 if __name__ == "__main__":
233     # Start by reading in the sitelist
234     ifp = open("testsites")
235     sitelist = []
236     linect = 0
237     while 1:
238         linect += 1
239         line = ifp.readline()
240         if not line:
241             break
242         elif line[0] in ("#", "\n"):
243             continue
244         else:
245             try:
246                 sitelist.append(TestSite(line))
247             except:
248                 print "Error on line %d" % linect
249                 sys.exit(0)
250
251     (options, arguments) = getopt.getopt(sys.argv[1:], "dfp:tigvse")
252     verbose = 0
253     for (switch, value) in options:
254         if switch == "-d":
255             # Prettprint the sitelist
256             map(lambda x: sys.stdout.write(x.prettyprint() + "%%\n"), sitelist)
257             sys.exit(0)
258         elif switch == "-f":
259             # Dump the sitelist as a .fetchmailrc file
260             map(lambda x: sys.stdout.write(x.entryprint()), sitelist)
261             sys.exit(0)
262         elif switch == "-p":
263             # Probe a single site
264             selected = []
265             for site in sitelist:
266                 if `site`.find(value) > -1:
267                     selected.append(site)
268             sitelist = selected
269             # Fall through
270         elif switch == "-t":
271             # Dump the sitelist in HTML table form
272             map(lambda x: sys.stdout.write(x.tableprint()), sitelist)
273             sys.exit(0)
274         elif switch == "-i":
275             # Dump the ids of the sitelist
276             map(lambda x: sys.stdout.write(x.id() + "\n"), sitelist)
277             sys.exit(0)
278         elif switch == "-g":
279             i = 1
280             for site in sitelist:
281                 print "Sending test mail to " + site.id()
282                 site.testmail(i)
283                 i+= 1
284             # Send test mail to each site
285             sys.stdout.write("Delaying to give the test mail time to land...")
286             time.sleep(5)
287             sys.stdout.write("here we go:\n")
288             # Fall through
289         elif switch == "-v":
290             # Display the test output
291             verbose = 1
292         elif switch == "-s":
293             # Dump recognition strings of all tested servers as a Python tuple
294             print "(" + ",\n".join(map(lambda x: repr(x.recognition), filter(lambda x: x.recognition, sitelist))) + ")"
295             sys.exit(0)
296         elif switch == "-e":
297             TortureGUI()
298             sys.exit(0)
299
300     # If no options, run the torture test
301     try:
302         failures = successes = 0
303         os.system("fetchmail -q")
304         for site in sitelist:
305             print "Testing " + site.id()
306             site.fetch(True)
307             if verbose:
308                 print site.output
309             if site.failed():
310                 failures += 1
311             else:
312                 successes += 1
313
314         # OK, summarize results
315         print "\n%d successes and %d failures out of %d tests" \
316               % (successes, failures, len(sitelist))
317
318         if failures:
319             print "Bad status was returned on the following sites:"
320             for site in sitelist:
321                 if site.failed():
322                     sys.stdout.write(site.explain() + "\n")
323     except KeyboardInterrupt:
324         print "Interrupted."
325
326 # end