Controlling Connection Bursts in Twisted Applications
I’ve been using Twisted’s Deferred
and DeferredList
to much pleasure, but recently I found I needed to limit the amount of tasks/connections that would run at once—my application simply spawned too many connections too quickly, and killed my little home router.
In this post I want to demonstrate how to do that using Twisted’s DeferredSemaphore
.
Here’s a sample application which doesn’t use DeferredSemaphore
:
from twisted.internet import defer
from twisted.internet import reactor
from twisted.names.client import lookupMailExchange
domains = [
'google.com',
'yahoo.com',
'microsoft.com',
'facebook.com',
'twitter.com'
]
def showMailExchanges(results):
for result in results:
# DeferredList returns (status, deferred) for each deferred, e.g.:
# (True, ([<RR name=twitter.com type=MX class=IN ttl=164s auth=False>,
# <RR name=twitter.com type=MX class=IN ttl=164s auth=False>,
# <RR name=twitter.com type=MX class=IN ttl=164s auth=False>,
# <RR name=twitter.com type=MX class=IN ttl=164s auth=False>,
# <RR name=twitter.com type=MX class=IN ttl=164s auth=False>],
# [], []))
ans, auth, add = result[1] # DNS results are always a 3-part tuple
for x in ans:
print("{0:15} {1}".format(x.name, x.payload.name))
deferreds = []
for domain in domains:
d = lookupMailExchange(domain)
deferreds.append(d)
dl = defer.DeferredList(deferreds)
dl.addCallback(showMailExchanges)
reactor.run()
# Example output:
# google.com aspmx.l.google.com
# google.com alt2.aspmx.l.google.com
# google.com alt3.aspmx.l.google.com
# google.com alt1.aspmx.l.google.com
# google.com alt4.aspmx.l.google.com
# yahoo.com b.mx.mail.yahoo.com
# yahoo.com d.mx.mail.yahoo.com
# yahoo.com e.mx.mail.yahoo.com
# yahoo.com f.mx.mail.yahoo.com
# yahoo.com g.mx.mail.yahoo.com
# yahoo.com h.mx.mail.yahoo.com
# yahoo.com i.mx.mail.yahoo.com
# yahoo.com j.mx.mail.yahoo.com
# yahoo.com k.mx.mail.yahoo.com
# yahoo.com l.mx.mail.yahoo.com
# yahoo.com m.mx.mail.yahoo.com
# yahoo.com n.mx.mail.yahoo.com
# yahoo.com a.mx.mail.yahoo.com
# microsoft.com mail.messaging.microsoft.com
# facebook.com smtpin.mx.facebook.com
# twitter.com alt2.aspmx.l.google.com
# twitter.com ASPMX2.GOOGLEMAIL.com
# twitter.com ASPMX3.GOOGLEMAIL.com
# twitter.com aspmx.l.google.com
# twitter.com alt1.aspmx.l.google.com
Say we want to limit the amount of lookups that are performed at once. Enter Twisted’s DeferredSemaphore
:
from twisted.internet import defer
from twisted.internet import reactor
from twisted.names.client import lookupMailExchange
domains = [
'google.com',
'yahoo.com',
'microsoft.com',
'facebook.com',
'twitter.com'
]
def showMailExchanges(results):
for result in results:
ans, auth, add = result[1]
for x in ans:
print("{0:15} {1}".format(x.name, x.payload.name))
deferreds = []
sem = defer.DeferredSemaphore(2) # New
for domain in domains:
d = sem.run(lookupMailExchange, domain) # New
deferreds.append(d)
dl = defer.DeferredList(deferreds)
dl.addCallback(showMailExchanges)
reactor.run()
Neat. We only had to change two lines.
What Twisted’s “asynchronous semaphore” does is restrict the number of Deferred
objects that will run at once. In this case, we specify that at most two Deferred
objects (and their children) can do work at the same time. That means our script won’t try to look up the mail exchanges of more than two domains at once.
If we want to implement an application-wide semaphore, we can write a helper function that returns a global semaphore:
from twisted.internet import defer
from twisted.internet import reactor
from twisted.names.client import lookupMailExchange
domains = [
'google.com',
'yahoo.com',
'microsoft.com',
'facebook.com',
'twitter.com'
]
def showMailExchanges(results):
for result in results:
ans, auth, add = result[1]
for x in ans:
print("{0:15} {1}".format(x.name, x.payload.name))
theSemaphore = None
def getSemaphore():
global theSemaphore
if theSemaphore is None:
theSemaphore = defer.DeferredSemaphore(2)
return theSemaphore
deferreds = []
sem = getSemaphore()
for domain in domains:
d = sem.run(lookupMailExchange, domain)
deferreds.append(d)
dl = defer.DeferredList(deferreds)
dl.addCallback(showMailExchanges)
reactor.run()
Now, whenever we do something, and we’re using this module’s getSemaphore
function to load the semaphore, the amount of Deferred
objects that run at once is restricted to two. Awesome.
We can even write a “semaphore map” to do away with the boilerplate looping and adding:
from twisted.internet import defer
from twisted.internet import reactor
from twisted.names.client import lookupMailExchange
domains = [
'google.com',
'yahoo.com',
'microsoft.com',
'facebook.com',
'twitter.com'
]
def showMailExchanges(results):
for result in results:
ans, auth, add = result[1]
for x in ans:
print("{0:15} {1}".format(x.name, x.payload.name))
theSemaphore = None
def getSemaphore():
global theSemaphore
if theSemaphore is None:
theSemaphore = defer.DeferredSemaphore(2)
return theSemaphore
def semMap(function, things, *args, **kwargs):
assert callable(function)
sem = getSemaphore()
deferreds = []
for x in things:
d = sem.run(function, x, *args, **kwargs)
deferreds.append(d)
dl = defer.DeferredList(deferreds)
return dl
dl = semMap(lookupMailExchange, domains)
dl.addCallback(showMailExchanges)
reactor.run()
Pretty sweet, huh?