]> Pileus Git - ~andy/fetchmail/blob - torturetest.py
Ready to ship 6-2-0.
[~andy/fetchmail] / torturetest.py
1 #!/usr/bin/env python2
2
3 import sys, getopt, os, smtplib, commands, time
4
5 class TestSite:
6     temp = "/usr/tmp/torturestest-%d" % os.getpid()
7
8     def __init__(self, line):
9         "Initialize site data from the external representation."
10         (self.host, self.mailname, self.userid, self.password, \
11                 self.proto, self.options, self.capabilities, self.version, self.comment) = \
12                 line.strip().split(":")
13         if not self.mailname:
14             self.mailname = self.userid
15         # Test results
16         self.status = None
17         self.output = None
18
19     def allattrs(self):
20         "Return a tuple consisting of alll this site's attributes."
21         return (self.host, self.mailname, self.userid, self.password, \
22                 self.proto, self.options, self.capabilities, \
23                 self.version, self.comment)
24
25     def __repr__(self):
26         "Return the external representation of this site's data."
27         return ":".join(self.allattrs())
28
29     def prettyprint(self):
30         "Prettyprint a site entry in human-readable form."
31         return "Host: %s\n" \
32               "Mail To: %s\n" \
33               "Userid: %s\n" \
34               "Password: %s\n" \
35               "Protocol: %s\n" \
36               "Options: %s\n" \
37               "Capabilities: %s\n" \
38               "Version: %s\n" \
39               "Comment: %s\n" \
40               % self.allattrs()
41
42     def entryprint(self):
43         "Print a .fetchmailrc entry corresponding to a site entry."
44         return "poll %s-%s via %s with proto %s %s\n" \
45                "   user %s there with password '%s' is esr here\n\n" \
46                % (self.host,self.proto,self.host,self.proto,self.options,self.userid,self.password)
47
48     def tableprint(self):
49         "Print an HTML server-type table entry."
50         return "<tr><td>%s: %s</td><td>%s</td>\n" \
51                % (self.proto, self.comment, self.capabilities)
52
53     def id(self):
54         "Identify this site."
55         rep = "%s %s at %s" % (self.proto, self.version, self.host)
56         if self.capabilities:
57             rep += " (" + self.capabilities + ")"
58         if self.options:
59             rep += " using " + self.options
60         return rep
61
62     def testmail(self, n=None):
63         "Send test mail to the site."
64         server = smtplib.SMTP("localhost")
65         fromaddr = "esr@thyrsus.com"
66         if self.mailname.find("@") > -1:
67             toaddr = self.mailname
68         else:
69             toaddr = "%s@%s" % (self.mailname, self.host)
70         msg = ("From: %s\r\nTo: %s\r\n\r\n" % (fromaddr, toaddr))
71         if n != None:
72             msg += `n` + ": "
73         msg += "Test mail collected from %s.\n" % (self.id(),)
74         server.sendmail(fromaddr, toaddr, msg)
75         server.quit()
76
77     def fetch(self):
78         "Run a mail fetch on this site."
79         try:
80             ofp = open(TestSite.temp, "w")
81             ofp.write('defaults mda "(echo; echo \'From torturetest\' `date`; cat) >>TEST.LOG"\n')
82             ofp.write(site.entryprint())
83             ofp.close()
84             (self.status, self.output) = commands.getstatusoutput("fetchmail -d0 -v -f - <%s"%TestSite.temp)
85         finally:
86             os.remove(TestSite.temp)
87
88     def failed(self):
89         "Did we have a test failure here?"
90         return os.WIFEXITED(self.status) and os.WEXITSTATUS(self.status) > 1
91
92     def explain(self):
93         "Explain the status of the last test."
94         if not os.WIFEXITED(self.status):
95             return self.id() + ": abnormal termination\n"
96         elif os.WEXITSTATUS(self.status) > 1:
97             return self.id() + ": %d\n" % os.WEXITSTATUS(self.status) + self.output
98         else:
99             return self.id() + ": succeeded\n"
100
101 if __name__ == "__main__":
102     # Start by reading in the sitelist
103     ifp = open("testsites")
104     sitelist = []
105     linect = 0
106     while 1:
107         linect += 1
108         line = ifp.readline()
109         if not line:
110             break
111         elif line[0] in ("#", "\n"):
112             continue
113         else:
114             try:
115                 sitelist.append(TestSite(line))
116             except:
117                 print "Error on line %d" % linect
118                 sys.exit(0)
119
120     (options, arguments) = getopt.getopt(sys.argv[1:], "dfp:tigvs")
121     verbose = 0
122     for (switch, value) in options:
123         if switch == "-d":
124             # Prettprint the sitelist
125             map(lambda x: sys.stdout.write(x.prettyprint() + "%%\n"), sitelist)
126             sys.exit(0)
127         elif switch == "-f":
128             # Dump the sitelist as a .fetchmailrc file
129             map(lambda x: sys.stdout.write(x.entryprint()), sitelist)
130             sys.exit(0)
131         elif switch == "-p":
132             # Probe a single site
133             selected = []
134             for site in sitelist:
135                 if `site`.find(value) > -1:
136                     selected.append(site)
137             sitelist = selected
138             # Fall through
139         elif switch == "-t":
140             # Dump the sitelist in HTML table form
141             map(lambda x: sys.stdout.write(x.tableprint()), sitelist)
142             sys.exit(0)
143         elif switch == "-i":
144             # Dump the ids of the sitelist
145             map(lambda x: sys.stdout.write(x.id() + "\n"), sitelist)
146             sys.exit(0)
147         elif switch == "-g":
148             i = 1
149             for site in sitelist:
150                 print "Sending test mail to " + site.id()
151                 site.testmail(i)
152                 i+= 1
153             # Send test mail to each site
154             sys.stdout.write("Delaying to give the test mail time to land...")
155             time.sleep(5)
156             sys.stdout.write("here we go:\n")
157             # Fall through
158         elif switch == "-v":
159             # Display the test output
160             verbose = 1
161         elif switch == "-s":
162             # Dump version strings of all tested servers as a Python tuple
163             print "(" + ",\n".join(map(lambda x: repr(x.version), filter(lambda x: x.version, sitelist))) + ")"
164             sys.exit(0)
165
166     # If no options, run the torture test
167     try:
168         failures = successes = 0
169         os.system("fetchmail -q")
170         for site in sitelist:
171             print "Testing " + site.id()
172             site.fetch()
173             if verbose:
174                 print site.output
175             if site.failed():
176                 failures += 1
177             else:
178                 successes += 1
179
180         # OK, summarize results
181         print "\n%d successes and %d failures out of %d tests" \
182               % (successes, failures, len(sitelist))
183
184         if failures:
185             print "Bad status was returned on the following sites:"
186             for site in sitelist:
187                 if site.failed():
188                     sys.stdout.write(site.explain() + "\n")
189     except KeyboardInterrupt:
190         print "Interrupted."
191
192 # end