Source code for ldaptor.protocols.ldap.fetchschema

from ldaptor.protocols.ldap import ldaperrors, ldapsyntax
from ldaptor.protocols import pureldap
from ldaptor import schema
from ldaptor._encoder import to_bytes


def _fetchCb(subschemaSubentry, client):
    o = ldapsyntax.LDAPEntry(client=client, dn=subschemaSubentry)
    d = o.search(
        scope=pureldap.LDAP_SCOPE_baseObject,
        sizeLimit=1,
        attributes=["attributeTypes", "objectClasses"],
    )

    def handleSearchResults(l):
        if len(l) == 0:
            raise ldaperrors.LDAPOther("No such DN")
        elif len(l) == 1:
            o = l[0]

            attributeTypes = []
            objectClasses = []
            for text in o.get("attributeTypes", []):
                attributeTypes.append(schema.AttributeTypeDescription(to_bytes(text)))
            for text in o.get("objectClasses", []):
                objectClasses.append(schema.ObjectClassDescription(to_bytes(text)))
            assert attributeTypes, (
                "LDAP server doesn't give attributeTypes for subschemaSubentry dn=%s"
                % o.dn
            )
            return (attributeTypes, objectClasses)
        else:
            raise ldaperrors.LDAPOther("DN matched multiple entries")

    d.addCallback(handleSearchResults)
    return d


[docs]def fetch(client, baseObject): o = ldapsyntax.LDAPEntry(client=client, dn=baseObject) d = o.search( scope=pureldap.LDAP_SCOPE_baseObject, sizeLimit=1, attributes=["subschemaSubentry"], ) def handleSearchResults(l): if len(l) == 0: raise ldaperrors.LDAPOther("No such DN") elif len(l) == 1: o = l[0] assert "subschemaSubentry" in o, "No subschemaSubentry. TODO" subSchemas = o["subschemaSubentry"] assert ( len(subSchemas) == 1 ), "More than one subschemaSubentry is not support yet. TODO" for s in subSchemas: return s else: raise ldaperrors.LDAPOther("DN matched multiple entries") d.addCallback(handleSearchResults) d.addCallback(_fetchCb, client) return d