LDAP Servers¶
An LDAP directory information tree (DIT) is a highly specialized database with entries arranged in a tree-like structure.
File-System LDAP DIT¶
A minimal LDAP DIT that stores entries in the local file system
Code¶
First, a module that defines our DIT entries– schema.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | COUNTRY = (
"dc=fr",
{
"objectClass": ["dcObject", "country"],
"dc": ["fr"],
"description": ["French country 2 letters iso description"],
},
)
COMPANY = (
"dc=example",
{
"objectClass": ["dcObject", "organization"],
"dc": ["example"],
"description": ["My organisation"],
"o": ["Example, Inc"],
},
)
PEOPLE = (
"ou=people",
{
"ou": ["people"],
"description": ["People from Example Inc"],
"objectclass": ["organizationalunit"],
},
)
USERS = [
(
"uid=yoen",
{
"objectClass": ["people", "inetOrgPerson"],
"cn": ["Yoen Van der Weld"],
"sn": ["Van der Weld"],
"givenName": ["Yoen"],
"uid": ["yoen"],
"mail": ["/home/yoen/mailDir"],
"userPassword": ["secret"],
},
),
(
"uid=esteban",
{
"objectClass": ["people", "inetOrgPerson"],
"cn": ["Esteban Garcia Marquez"],
"sn": ["Garcia Marquez"],
"givenName": ["Esteban"],
"uid": ["esteban"],
"mail": ["/home/esteban/mailDir"],
"userPassword": ["secret2"],
},
),
(
"uid=mohamed",
{
"objectClass": ["people", "inetOrgPerson"],
"cn": ["Mohamed Al Ghâlib"],
"sn": ["Al Ghâlib"],
"givenName": ["mohamed"],
"uid": ["mohamed"],
"mail": ["/home/mohamed/mailDir"],
"userPassword": ["secret3"],
},
),
]
|
Next, the server code– ldaptor_basic.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | #! /usr/bin/env python
"""
Testing a simple ldaptor ldap server
Base on an example by Gaston TJEBBES aka "tonthon":
http://tonthon.blogspot.com/2011/02/ldaptor-ldap-with-twisted-server-side.html
"""
import tempfile, sys
from twisted.application import service
from twisted.internet import reactor
from twisted.internet.protocol import ServerFactory
from twisted.python.components import registerAdapter
from twisted.python import log
from ldaptor.interfaces import IConnectedLDAPEntry
from ldaptor.protocols.ldap.ldapserver import LDAPServer
from ldaptor.ldiftree import LDIFTreeEntry
from schema import COUNTRY, COMPANY, PEOPLE, USERS
class Tree:
def __init__(self):
dirname = tempfile.mkdtemp(".ldap", "test-server", "/tmp")
self.db = LDIFTreeEntry(dirname)
self.init_db()
def init_db(self):
"""
Add subtrees to the top entry
top->country->company->people
"""
country = self.db.addChild(COUNTRY[0], COUNTRY[1])
company = country.addChild(COMPANY[0], COMPANY[1])
people = company.addChild(PEOPLE[0], PEOPLE[1])
for user in USERS:
people.addChild(user[0], user[1])
class LDAPServerFactory(ServerFactory):
"""
Our Factory is meant to persistently store the ldap tree
"""
protocol = LDAPServer
def __init__(self, root):
self.root = root
def buildProtocol(self, addr):
proto = self.protocol()
proto.debug = self.debug
proto.factory = self
return proto
if __name__ == "__main__":
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 8080
# First of all, to show logging info in stdout :
log.startLogging(sys.stderr)
# We initialize our tree
tree = Tree()
# When the ldap protocol handle the ldap tree,
# it retrieves it from the factory adapting
# the factory to the IConnectedLDAPEntry interface
# So we need to register an adapter for our factory
# to match the IConnectedLDAPEntry
registerAdapter(lambda x: x.root, LDAPServerFactory, IConnectedLDAPEntry)
# Run it !!
factory = LDAPServerFactory(tree.db)
factory.debug = True
application = service.Application("ldaptor-server")
myService = service.IServiceCollection(application)
reactor.listenTCP(port, factory)
reactor.run()
|
LDAP Server which allows BIND with UPN¶
The LDAP server implemented by Microsoft Active Directory allows using the UPN as the BIND DN.
It is possible to implement something similar using ldaptor.
Below is a proof-of-concept implementation, which should not be used for production as it has an heuristic method for detecting which BIND DN is an UPN.
handle_LDAPBindRequest is the method called when a BIND request is received.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | """
An ldaptor LDAP server which can authenticate based on UPN, as AD does.
The LDAP entry needs to have the `userPrincipalName` attribute set.
dn: uid=bob,ou=people,dc=example,dc=org
objectclass: top
objectclass: person
objectClass: inetOrgPerson
uid: bob
cn: bobby
gn: Bob
sn: Roberts
mail: bob@example.org
homeDirectory: e:\\Users\\bob
userPassword: pass
userPrincipalName: bob@ad.example.org
"""
from ldaptor import interfaces
from ldaptor.protocols import pureldap
from ldaptor.protocols.ldap import distinguishedname, ldaperrors
from twisted.internet import defer
from ldaptor.protocols.ldap.ldapserver import LDAPServer
class LDAPServerWithUPNBind(LDAPServer):
"""
An LDAP server which support BIND using UPN similar to AD.
"""
_loginAttribute = b"userPrincipalName"
def handle_LDAPBindRequest(self, request, controls, reply):
if request.version != 3:
raise ldaperrors.LDAPProtocolError(
"Version %u not supported" % request.version
)
self.checkControls(controls)
if request.dn == b"":
# anonymous bind
self.boundUser = None
return pureldap.LDAPBindResponse(resultCode=0)
root = interfaces.IConnectedLDAPEntry(self.factory)
def _gotUPNResult(results):
if len(results) != 1:
# Not exactly one result, so this might not be an UNP.
return distinguishedname.DistinguishedName(request.dn)
# A single result, so the UPN might exist.
return results[0].dn
if b"@" in request.dn and b"," not in request.dn:
# This might be an UPN request.
filterText = b"(" + self._loginAttribute + b"=" + request.dn + b")"
d = root.search(filterText=filterText)
d.addCallback(_gotUPNResult)
else:
d = defer.succeed(distinguishedname.DistinguishedName(request.dn))
# Once the BIND DN is known, search for the LDAP entry.
d.addCallback(lambda dn: root.lookup(dn))
def _noEntry(fail):
"""
Called when the requested BIND DN was not found.
"""
fail.trap(ldaperrors.LDAPNoSuchObject)
return None
d.addErrback(_noEntry)
def _gotEntry(entry, auth):
"""
Called when the requested BIND DN was found.
"""
if entry is None:
raise ldaperrors.LDAPInvalidCredentials()
d = entry.bind(auth)
def _cb(entry):
"""
Called when BIND operation was successful.
"""
self.boundUser = entry
msg = pureldap.LDAPBindResponse(
resultCode=ldaperrors.Success.resultCode,
matchedDN=entry.dn.getText(),
)
return msg
d.addCallback(_cb)
return d
d.addCallback(_gotEntry, request.auth)
return d
|