]> Pileus Git - ~andy/fetchmail/blob - torturetest.py
Easy bug fixes for this round.
[~andy/fetchmail] / torturetest.py
1 #!/usr/bin/env python
2
3 import sys, getopt, os, smtplib, commands, time
4
5 class TestSite:
6     temp = "/usr/tmp/torturestest-%d" % os.getpid()
7
8     def __init__(self, line):
9         "Initialize site data from the external representation."
10         (self.host, self.mailname, self.userid, self.password, \
11                 self.proto, self.options, self.capabilities, self.version, self.comment) = \
12                 line.strip().split(":")
13         if not self.mailname:
14             self.mailname = self.userid
15         # Test results
16         self.status = None
17         self.output = None
18
19     def allattrs(self):
20         "Return a tuple consisting of alll this site's attributes."
21         return (self.host, self.mailname, self.userid, self.password, \
22                 self.proto, self.options, self.capabilities, \
23                 self.version, self.comment)
24
25     def __repr__(self):
26         "Return the external representation of this site's data."
27         return ":".join(self.allattrs())
28
29     def prettyprint(self):
30         "Prettyprint a site entry in human-readable form."
31         return "Host: %s\n" \
32               "Mail To: %s\n" \
33               "Userid: %s\n" \
34               "Password: %s\n" \
35               "Protocol: %s\n" \
36               "Options: %s\n" \
37               "Capabilities: %s\n" \
38               "Version: %s\n" \
39               "Comment: %s\n" \
40               % self.allattrs()
41
42     def entryprint(self):
43         "Print a .fetchmailrc entry corresponding to a site entry."
44         return "poll %s-%s via %s with proto %s %s\n" \
45                "   user %s there with password '%s' is esr here\n\n" \
46                % (self.host,self.proto,self.host,self.proto,self.options,self.userid,self.password)
47
48     def tableprint(self):
49         "Print an HTML server-type table entry."
50         return "<tr><td>%s: %s</td><td>%s</td>\n" \
51                % (self.proto, self.comment, self.capabilities)
52
53     def id(self):
54         "Identify this site."
55         rep = "%s %s at %s" % (self.proto, self.version, self.host)
56         if self.capabilities:
57             rep += " (" + self.capabilities + ")"
58         if self.options:
59             rep += " using " + self.options
60         return rep
61
62     def testmail(self, n=None):
63         "Send test mail to the site."
64         server = smtplib.SMTP("localhost")
65         fromaddr = "esr@thyrsus.com"
66         toaddr = "%s@%s" % (self.mailname, self.host)
67         msg = ("From: %s\r\nTo: %s\r\n\r\n" % (fromaddr, toaddr))
68         if n != None:
69             msg += `n` + ": "
70         msg += "Test mail collected from %s.\n" % (self.id(),)
71         server.sendmail(fromaddr, toaddr, msg)
72         server.quit()
73
74     def fetch(self):
75         "Run a mail fetch on this site."
76         try:
77             ofp = open(TestSite.temp, "w")
78             ofp.write('defaults mda "(echo; echo \'From torturetest\' `date`; cat) >>TEST.LOG"\n')
79             ofp.write(site.entryprint())
80             ofp.close()
81             (self.status, self.output) = commands.getstatusoutput("fetchmail -d0 -v -f - <%s"%TestSite.temp)
82         finally:
83             os.remove(TestSite.temp)
84
85     def failed(self):
86         "Did we have a test failure here?"
87         return os.WIFEXITED(self.status) and os.WEXITSTATUS(self.status) > 1
88
89     def explain(self):
90         "Explain the status of the last test."
91         if not os.WIFEXITED(self.status):
92             return self.id() + ": abnormal termination\n"
93         elif os.WEXITSTATUS(self.status) > 1:
94             return self.id() + ": %d\n" % os.WEXITSTATUS(self.status) + self.output
95         else:
96             return self.id() + ": succeeded\n"
97
98 if __name__ == "__main__":
99     # Start by reading in the sitelist
100     ifp = open("testsites")
101     sitelist = []
102     linect = 0
103     while 1:
104         linect += 1
105         line = ifp.readline()
106         if not line:
107             break
108         elif line[0] in ("#", "\n"):
109             continue
110         else:
111             try:
112                 sitelist.append(TestSite(line))
113             except:
114                 print "Error on line %d" % linect
115                 sys.exit(0)
116
117     (options, arguments) = getopt.getopt(sys.argv[1:], "dfp:tigvs")
118     verbose = 0
119     for (switch, value) in options:
120         if switch == "-d":
121             # Prettprint the sitelist
122             map(lambda x: sys.stdout.write(x.prettyprint() + "%%\n"), sitelist)
123             sys.exit(0)
124         elif switch == "-f":
125             # Dump the sitelist as a .fetchmailrc file
126             map(lambda x: sys.stdout.write(x.entryprint()), sitelist)
127             sys.exit(0)
128         elif switch == "-p":
129             # Probe a single site
130             selected = []
131             for site in sitelist:
132                 if `site`.find(value) > -1:
133                     selected.append(site)
134             sitelist = selected
135             # Fall through
136         elif switch == "-t":
137             # Dump the sitelist in HTML table form
138             map(lambda x: sys.stdout.write(x.tableprint()), sitelist)
139             sys.exit(0)
140         elif switch == "-i":
141             # Dump the ids of the sitelist
142             map(lambda x: sys.stdout.write(x.id() + "\n"), sitelist)
143             sys.exit(0)
144         elif switch == "-g":
145             i = 1
146             for site in sitelist:
147                 print "Sending test mail to " + site.id()
148                 site.testmail(i)
149                 i+= 1
150             # Send test mail to each site
151             sys.stdout.write("Delaying to give the test mail time to land...")
152             time.sleep(5)
153             sys.stdout.write("here we go:\n")
154             # Fall through
155         elif switch == "-v":
156             # Display the test output
157             verbose = 1
158         elif switch == "-s":
159             # Dump version strings of all tested servers as a Python tuple
160             print "(" + ",\n".join(map(lambda x: repr(x.version), filter(lambda x: x.version, sitelist))) + ")"
161             sys.exit(0)
162
163     # If no options, run the torture test
164     try:
165         failures = successes = 0
166         os.system("fetchmail -q")
167         for site in sitelist:
168             print "Testing " + site.id()
169             site.fetch()
170             if verbose:
171                 print site.output
172             if site.failed():
173                 failures += 1
174             else:
175                 successes += 1
176
177         # OK, summarize results
178         print "\n%d successes and %d failures out of %d tests" \
179               % (successes, failures, len(sitelist))
180
181         if failures:
182             print "Bad status was returned on the following sites:"
183             for site in sitelist:
184                 if site.failed():
185                     sys.stdout.write(site.explain() + "\n")
186     except KeyboardInterrupt:
187         print "Interrupted."
188
189 # end