import datetime

from ldaptor.protocols.ldap import proxy
from ldaptor.protocols.ldap import ldapsyntax, ldaperrors
from ldaptor.protocols import pureldap

[docs]class ServiceBindingProxy(proxy.Proxy): """ An LDAP proxy that handles non-anonymous bind requests specially. BindRequests are intercepted and authentication is attempted against each configured service. This authentication is performed against a separate LDAP entry, found by searching for entries with - objectClass: serviceSecurityObject - owner: the DN of the original bind attempt - cn: the service name. starting at the identity-base as configured in the config file. Finally, if the authentication does not succeed against any of the configured services, the proxy can fallback to passing the bind request to the real server. """ services = [] fallback = False def __init__(self, services=None, fallback=None, *a, **kw): """ Initialize the object. @param services: List of service names to try to bind against. @param fallback: If none of the attempts to authenticate against a specific service succeeded, whether to fall back to the normal LDAP bind mechanism. """ proxy.Proxy.__init__(self, *a, **kw) if services is not None: = list(services) if fallback is not None: self.fallback = fallback def _startSearch(self, request, controls, reply): services = list( baseDN = self.config.getIdentityBaseDN() e = ldapsyntax.LDAPEntryWithClient(client=self.client, dn=baseDN) d = self._tryService(services, e, request, controls, reply) d.addCallback(self._maybeFallback, request, controls, reply) return d def _maybeFallback(self, entry, request, controls, reply): if entry is not None: msg = pureldap.LDAPBindResponse( resultCode=ldaperrors.Success.resultCode, matchedDN=request.dn ) return msg elif self.fallback: self.handleUnknown(request, controls, reply) else: msg = pureldap.LDAPBindResponse( resultCode=ldaperrors.LDAPInvalidCredentials.resultCode ) return msg
[docs] def timestamp(self): now = return now.strftime("%Y%m%d%H%M%SZ")
def _tryService(self, services, baseEntry, request, controls, reply): try: serviceName = services.pop(0) except IndexError: return None timestamp = self.timestamp() d = filterObject=pureldap.LDAPFilter_and( [ pureldap.LDAPFilter_equalityMatch( attributeDesc=pureldap.LDAPAttributeDescription("objectClass"), assertionValue=pureldap.LDAPAssertionValue( "serviceSecurityObject" ), ), pureldap.LDAPFilter_equalityMatch( attributeDesc=pureldap.LDAPAttributeDescription("owner"), assertionValue=pureldap.LDAPAssertionValue(request.dn), ), pureldap.LDAPFilter_equalityMatch( attributeDesc=pureldap.LDAPAttributeDescription("cn"), assertionValue=pureldap.LDAPAssertionValue(serviceName), ), pureldap.LDAPFilter_or( [ # no time pureldap.LDAPFilter_not( pureldap.LDAPFilter_present("validFrom") ), # or already valid pureldap.LDAPFilter_lessOrEqual( attributeDesc=pureldap.LDAPAttributeDescription( "validFrom" ), assertionValue=pureldap.LDAPAssertionValue(timestamp), ), ] ), pureldap.LDAPFilter_or( [ # no time pureldap.LDAPFilter_not( pureldap.LDAPFilter_present("validUntil") ), # or still valid pureldap.LDAPFilter_greaterOrEqual( attributeDesc=pureldap.LDAPAttributeDescription( "validUntil" ), assertionValue=pureldap.LDAPAssertionValue(timestamp), ), ] ), ] ), attributes=("1.1",), ) def _gotEntries(entries): if not entries: return None assert len(entries) == 1 # TODO e = entries[0] d = e.bind(request.auth) return d d.addCallback(_gotEntries) d.addCallbacks( callback=self._loopIfNone, callbackArgs=(services, baseEntry, request, controls, reply), errback=self._loopIfBindError, errbackArgs=(services, baseEntry, request, controls, reply), ) return d def _loopIfNone(self, r, *a, **kw): if r is None: d = self._tryService(*a, **kw) return d else: return r def _loopIfBindError(self, fail, *a, **kw): fail.trap(ldaperrors.LDAPInvalidCredentials) d = self._tryService(*a, **kw) return d fail_LDAPBindRequest = pureldap.LDAPBindResponse
[docs] def handle_LDAPBindRequest(self, request, controls, reply): if request.version != 3: raise ldaperrors.LDAPProtocolError( "Version %u not supported" % request.version ) self.checkControls(controls) if request.dn == "": # anonymous bind return self.handleUnknown(request, controls, reply) else: d = self._whenConnected(self._startSearch, request, controls, reply) return d
if __name__ == "__main__": """ Demonstration LDAP proxy; passes all requests to localhost:389. """ from twisted.internet import reactor, protocol from twisted.python import log import sys log.startLogging(sys.stderr) from ldaptor import config factory = protocol.ServerFactory() cfg = config.LDAPConfig( serviceLocationOverrides={ "": ("localhost", 389), } ) factory.protocol = lambda: ServiceBindingProxy( config=cfg, services=[ "svc1", "svc2", "svc3", ], fallback=True, ) reactor.listenTCP(10389, factory)