Source code for ldaptor.checkers

from zope.interface import implementer
from twisted.cred import checkers, credentials, error
from twisted.internet import reactor
from twisted.python import failure

from ldaptor import ldapfilter, config
from ldaptor.protocols.ldap import ldapconnector, ldapclient, ldapsyntax, ldaperrors


[docs]def makeFilter(name, template=None): filter = None try: filter = ldapfilter.parseFilter(name) except ldapfilter.InvalidLDAPFilter: try: filter = ldapfilter.parseFilter("(" + name + ")") except ldapfilter.InvalidLDAPFilter: if template is not None: try: filter = ldapfilter.parseFilter(template % {"name": name}) except ldapfilter.InvalidLDAPFilter: pass return filter
[docs]@implementer(checkers.ICredentialsChecker) class LDAPBindingChecker: """ The avatarID returned is an LDAPEntry. """ credentialInterfaces = (credentials.IUsernamePassword,) def __init__(self, cfg): self.config = cfg def _valid(self, result, entry): matchedDN, serverSaslCreds = result return entry def _found(self, results, credentials): if not results: return failure.Failure(error.UnauthorizedLogin("TODO 1")) assert len(results) == 1 entry = results[0] d = entry.client.bind(str(entry.dn), credentials.password) d.addCallback(self._valid, entry) return d def _connected(self, client, filt, credentials): base = ldapsyntax.LDAPEntry(client, self.config.getIdentityBaseDN()) d = base.search( filterObject=filt, sizeLimit=1, attributes=[""], # TODO no attributes ) d.addCallback(self._found, credentials) return d
[docs] def requestAvatarId(self, credentials): try: baseDN = self.config.getIdentityBaseDN() except config.MissingBaseDNError as e: return failure.Failure( error.UnauthorizedLogin("Disabled due configuration error: %s." % e) ) if not credentials.username: return failure.Failure(error.UnauthorizedLogin("I don't support anonymous")) filtText = self.config.getIdentitySearch(credentials.username) try: filt = ldapfilter.parseFilter(filtText) except ldapfilter.InvalidLDAPFilter: return failure.Failure(error.UnauthorizedLogin("Couldn't create filter")) c = ldapconnector.LDAPClientCreator(reactor, ldapclient.LDAPClient) d = c.connect(baseDN, self.config.getServiceLocationOverrides()) d.addCallback(self._connected, filt, credentials) def _err(reason): reason.trap( ldaperrors.LDAPInvalidCredentials, # this happens with slapd 2.1.30 when binding # with DN but no password ldaperrors.LDAPUnwillingToPerform, ) return failure.Failure(error.UnauthorizedLogin()) d.addErrback(_err) return d