]> Pileus Git - ~andy/fetchmail/blob - dist-tools/test/torturetest.py
Credit John Beck's fixes.
[~andy/fetchmail] / dist-tools / test / torturetest.py
1 #!/usr/bin/env python
2
3 import sys, getopt, os, smtplib, commands, time, gtk, gtk.glade
4 os.environ['LC_ALL'] = 'C'
5
6 ### CUSTOMIZE THESE!
7 # Mail address to use in From: header, user@host.domain format
8 fromaddr = "ma+fetchmail@dt.e-technik.uni-dortmund.de"
9 # local user name to receive mail as
10 localuser = "emma"
11 # server to inject mail into
12 smtpserver = "localhost"
13 # delay after sending mail
14 delay = 30
15 ### END OF REQUIRED CUSTOMIZATION
16
17 # only used for the GUI:
18 protocols = ('POP3', 'APOP', 'IMAP',)
19
20 class TestSite:
21     temp = "/usr/tmp/torturestest-%d" % os.getpid()
22
23     def __init__(self, line=None):
24         "Initialize site data from the external representation."
25         self.host = ""
26         self.mailaddr = ""
27         self.username = ""
28         self.password = ""
29         self.protocol = ""
30         self.ssl = ""
31         self.options = ""
32         self.capabilities = ""
33         self.recognition = ""
34         self.comment = ""
35         if line:
36             (self.host, self.mailaddr, self.username, self.password, \
37              self.protocol, self.ssl, self.options, self.capabilities, \
38              self.recognition, self.comment) = \
39                 line.strip().split(":")
40         if not self.mailaddr:
41             self.mailaddr = self.username
42         # Test results
43         self.status = None
44         self.output = None
45
46     def allattrs(self):
47         "Return a tuple consisting of all this site's attributes."
48         return (self.host, self.mailaddr, self.username, self.password, \
49                 self.protocol, self.ssl, self.options, self.capabilities, \
50                 self.recognition, self.comment)
51
52     def __repr__(self):
53         "Return the external representation of this site's data."
54         return ":".join(self.allattrs())
55
56     def prettyprint(self):
57         "Prettyprint a site entry in human-readable form."
58         return "Host: %s\n" \
59               "Mail To: %s\n" \
60               "Username: %s\n" \
61               "Password: %s\n" \
62               "Protocol: %s\n" \
63               "SSL: %s\n" \
64               "Options: %s\n" \
65               "Capabilities: %s\n" \
66               "Recognition: %s\n" \
67               "Comment: %s\n" \
68               % self.allattrs()
69
70     def entryprint(self):
71         "Print a .fetchmailrc entry corresponding to a site entry."
72         rep = "poll %s-%s via %s with proto %s %s\n" \
73                "   user %s there with password '%s' is %s here" \
74                % (self.host,self.protocol,self.host,self.protocol,self.options,self.username,self.password,localuser)
75         if self.ssl and self.ssl != 'False':
76             rep += " options ssl"
77         rep += "\n\n"
78         return rep
79
80     def tableprint(self):
81         "Print an HTML server-type table entry."
82         return "<tr><td>%s: %s</td><td>%s</td>\n" \
83                % (self.protocol, self.comment, self.capabilities)
84
85     def id(self):
86         "Identify this site."
87         rep = "%s %s at %s" % (self.protocol, self.recognition, self.host)
88         if self.capabilities:
89             rep += " (" + self.capabilities + ")"
90         if self.options:
91             rep += " using " + self.options
92         return rep
93
94     def testmail(self, n=None):
95         "Send test mail to the site."
96         server = smtplib.SMTP(smtpserver)
97         if self.mailaddr.find("@") > -1:
98             toaddr = self.mailaddr
99         else:
100             toaddr = "%s@%s" % (self.mailaddr, self.host)
101         msg = ("From: %s\r\nTo: %s\r\n\r\n" % (fromaddr, toaddr))
102         if n != None:
103             msg += `n` + ": "
104         msg += "Test mail collected from %s.\n" % (self.id(),)
105         server.sendmail(fromaddr, toaddr, msg)
106         server.quit()
107
108     def fetch(self, logfile=False):
109         "Run a mail fetch on this site."
110         try:
111             ofp = open(TestSite.temp, "w")
112             if logfile:
113                 mda = "(echo \'From torturetest\'  `date`;cat -;echo) >>TEST.LOG"
114             else:
115                 mda = 'cat'
116             ofp.write('defaults mda "%s"\n' % mda)
117             ofp.write(self.entryprint())
118             ofp.close()
119             (self.status, self.output) = commands.getstatusoutput("fetchmail -d0 -v -f - <%s"%TestSite.temp)
120             if self.status:
121                 os.system("cat " + TestSite.temp)
122         finally:
123             os.remove(TestSite.temp)
124
125     def failed(self):
126         "Did we have a test failure here?"
127         return os.WIFEXITED(self.status) and os.WEXITSTATUS(self.status) > 1
128
129     def explain(self):
130         "Explain the status of the last test."
131         if not os.WIFEXITED(self.status):
132             return self.id() + ": abnormal termination\n"
133         elif os.WEXITSTATUS(self.status) > 1:
134             return self.id() + ": %d\n" % os.WEXITSTATUS(self.status) + self.output
135         else:
136             return self.id() + ": succeeded\n"
137
138 class TortureGUI:
139     "Torturetest editing GUI,"
140
141     # All site parameters except protocol
142     field_map = ('host', 'mailaddr', 'username', 'password', \
143                 'options', 'capabilities', 'recognition', 'comment')
144
145     def __init__(self):
146         # Build the widget tree from the glade XML file.
147         self.wtree = gtk.glade.XML("torturetest.glade")
148         # File in initial values
149         self.combo = self.wtree.get_widget("combo1")
150         self.combo.set_popdown_strings(map(lambda x: x.comment, sitelist))
151         self.sslcheck = self.wtree.get_widget("ssl_checkbox")
152         self.site = sitelist[0]
153         self.display(self.site)
154
155         # Provide handlers for the widget tree's events
156         mydict = {}
157         for key in ('on_torturetest_destroy',
158                     'on_updatebutton_clicked',
159                     'on_newbutton_clicked',
160                     'on_testbutton_clicked',
161                     'on_quitbutton_clicked',
162                     'on_dumpbutton_clicked',
163                     'on_combo_entry1_activate'):
164             mydict[key] = getattr(self, key)
165         self.wtree.signal_autoconnect(mydict)
166
167         gtk.mainloop()
168         print `self.site`
169
170     def get_widget(self, widget):
171         "Get the value of a widget's contents."
172         if type(widget) == type(""):
173             widget = self.wtree.get_widget(widget)
174         if type(widget) == gtk.Entry:
175             return widget.get_text()
176         #elif type(widget) == gtk.SpinButton:
177         #    return widget.get_value()
178         #elif type(widget) == gtk.TextView:
179         #    return widget.get_buffer().get_text()
180
181     def set_widget(self, name, exp):
182         "Set the value of a widget by name."
183         widget = self.wtree.get_widget(name)
184         if type(widget) == gtk.Entry:
185             widget.set_text(exp)
186         elif type(widget) == gtk.SpinButton:
187             widget.set_value(exp)
188         elif type(widget) == gtk.TextView:
189             if not widget.get_buffer():
190                 widget.set_buffer(gtk.TextBuffer())
191             widget.get_buffer().set_text(exp)
192
193     def display(self, site):
194         for member in TortureGUI.field_map:
195             self.set_widget(member + "_entry", getattr(site, member))
196         for proto in protocols:
197             self.wtree.get_widget(proto + "_radiobutton").set_active(site.protocol == proto)
198         self.sslcheck.set_active(int(site.ssl != '' and site.ssl != 'False'))
199         self.combo.entry.set_text(site.comment)
200
201     def update(self, site):
202         for member in TortureGUI.field_map:
203             setattr(site, member, self.get_widget(member + "_entry"))
204         for proto in protocols:
205             if self.wtree.get_widget(proto + "_radiobutton").get_active():
206                 site.protocol = proto
207         if self.wtree.get_widget("ssl_checkbox").get_active():
208             site.ssl = "True"
209         else:
210             site.ssl = "False"
211
212     # Housekeeping
213     def on_torturetest_destroy(self, obj):
214         gtk.mainquit()
215     def on_updatebutton_clicked(self, obj):
216         self.update(self.site)
217         print self.site
218         if self.site.comment:
219             self.combo.entry.set_text(self.site.comment)
220         else:
221             self.combo.entry.set_text(self.site.host)
222     def on_newbutton_clicked(self, obj):
223         global sitelist
224         sitelist = [TestSite()] + sitelist
225         self.site = sitelist[0]
226         self.display(self.site)
227         self.combo.entry.set_text("")
228     def on_testbutton_clicked(self, obj):
229         self.site.fetch(False)
230         print self.site.output
231     def on_quitbutton_clicked(self, obj):
232         gtk.mainquit()
233     def on_dumpbutton_clicked(self, obj):
234         print `self.site`
235
236     def on_combo_entry1_activate(self, obj):
237         key = self.combo.entry.get_text()
238         for site in sitelist:
239             if site.comment.find(key) > -1:
240                 self.site = site
241                 self.display(self.site)
242                 break
243
244 if __name__ == "__main__":
245     # Start by reading in the sitelist
246     ifp = open("testsites")
247     sitelist = []
248     linect = 0
249     while 1:
250         linect += 1
251         line = ifp.readline()
252         if not line:
253             break
254         elif line[0] in ("#", "\n"):
255             continue
256         else:
257             try:
258                 sitelist.append(TestSite(line))
259             except:
260                 print "Error on line %d" % linect
261                 sys.exit(0)
262
263     (options, arguments) = getopt.getopt(sys.argv[1:], "dfp:tigvse")
264     verbose = 0
265     for (switch, value) in options:
266         if switch == "-d":
267             # Prettprint the sitelist
268             map(lambda x: sys.stdout.write(x.prettyprint() + "%%\n"), sitelist)
269             sys.exit(0)
270         elif switch == "-f":
271             # Dump the sitelist as a .fetchmailrc file
272             map(lambda x: sys.stdout.write(x.entryprint()), sitelist)
273             sys.exit(0)
274         elif switch == "-p":
275             # Probe a single site
276             selected = []
277             for site in sitelist:
278                 if `site`.find(value) > -1:
279                     selected.append(site)
280             sitelist = selected
281             # Fall through
282         elif switch == "-t":
283             # Dump the sitelist in HTML table form
284             map(lambda x: sys.stdout.write(x.tableprint()), sitelist)
285             sys.exit(0)
286         elif switch == "-i":
287             # Dump the ids of the sitelist
288             map(lambda x: sys.stdout.write(x.id() + "\n"), sitelist)
289             sys.exit(0)
290         elif switch == "-g":
291             i = 1
292             for site in sitelist:
293                 print "Sending test mail to " + site.id()
294                 site.testmail(i)
295                 i+= 1
296             # Send test mail to each site
297             print "Delaying to give the test mail time to land..."
298             time.sleep(delay)
299             print "Here we go:"
300             # Fall through
301         elif switch == "-v":
302             # Display the test output
303             verbose = 1
304         elif switch == "-s":
305             # Dump recognition strings of all tested servers as a Python tuple
306             print "(" + ",\n".join(map(lambda x: repr(x.recognition), filter(lambda x: x.recognition, sitelist))) + ")"
307             sys.exit(0)
308         elif switch == "-e":
309             TortureGUI()
310             sys.exit(0)
311
312     # If no options, run the torture test
313     try:
314         failures = successes = 0
315         os.system("fetchmail -q")
316         for site in sitelist:
317             print "Testing " + site.id()
318             site.fetch(True)
319             if verbose:
320                 print site.output
321             if site.failed():
322                 failures += 1
323             else:
324                 successes += 1
325
326         # OK, summarize results
327         print "\n%d successes and %d failures out of %d tests" \
328               % (successes, failures, len(sitelist))
329
330         if failures:
331             print "Bad status was returned on the following sites:"
332             for site in sitelist:
333                 if site.failed():
334                     sys.stdout.write(site.explain() + "\n")
335     except KeyboardInterrupt:
336         print "Interrupted."
337
338 # end