Source code for ldaptor.protocols.pureldap

# Copyright (C) 2001 Tommi Virtanen
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

"""LDAP protocol message conversion; no application logic here."""

import string


from ldaptor.protocols.pureber import (

    BERBoolean, BERDecoderContext, BEREnumerated, BERInteger, BERNull,
    BEROctetString, BERSequence, BERSequenceOf, BERSet, BERStructured,

    CLASS_APPLICATION, CLASS_CONTEXT,

    berDecodeMultiple, berDecodeObject, int2berlen,
    )
from ldaptor._encoder import to_bytes, repr_converter

next_ldap_message_id = 1


[docs]def alloc_ldap_message_id(): global next_ldap_message_id r = next_ldap_message_id next_ldap_message_id = next_ldap_message_id + 1 return r
[docs]def escape(s): s = s.replace('\\', r'\5c') s = s.replace('*', r'\2a') s = s.replace('(', r'\28') s = s.replace(')', r'\29') s = s.replace('\0', r'\00') return s
[docs]def binary_escape(s): return ''.join('\\{:02x}'.format(ord(c)) for c in s)
[docs]def smart_escape(s, threshold=0.30): binary_count = sum(c not in string.printable for c in s) if float(binary_count) / float(len(s)) > threshold: return binary_escape(s) return escape(s)
[docs]class LDAPInteger(BERInteger): pass
[docs]class LDAPString(BEROctetString): def __init__(self, *args, **kwargs): self.escaper = kwargs.pop('escaper', escape) super().__init__(*args, **kwargs)
[docs]class LDAPAttributeValue(BEROctetString): pass
[docs]class LDAPMessage(BERSequence): """ To encode this object in order to be sent over the network use the toWire() method. """ id = None value = None
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, berdecoder) id_ = l[0].value value = l[1] if l[2:]: controls = [] for c in l[2]: controls.append(( c.controlType, c.criticality, c.controlValue, )) else: controls = None assert not l[3:] r = klass(id=id_, value=value, controls=controls, tag=tag) return r
def __init__(self, value=None, controls=None, id=None, tag=None): BERSequence.__init__(self, value=[], tag=tag) assert value is not None self.id = id if self.id is None: self.id = alloc_ldap_message_id() self.value = value self.controls = controls
[docs] def toWire(self): """ This is the wire/encoded representation. """ l = [BERInteger(self.id), self.value] if self.controls is not None: l.append(LDAPControls([LDAPControl(*a) for a in self.controls])) return BERSequence(l).toWire()
def __repr__(self): l = [] l.append('id=%r' % self.id) l.append('value=%r' % self.value) l.append('controls=%r' % self.controls) if self.tag != self.__class__.tag: l.append('tag=%d' % self.tag) return self.__class__.__name__ + '(' + ', '.join(l) + ')'
[docs]class LDAPProtocolOp: def __init__(self): pass
[docs] def toWire(self): raise NotImplementedError()
[docs]class LDAPProtocolRequest(LDAPProtocolOp): needs_answer = 1
[docs]class LDAPProtocolResponse(LDAPProtocolOp): pass
[docs]class LDAPBERDecoderContext_LDAPBindRequest(BERDecoderContext): Identities = { CLASS_CONTEXT | 0x00: BEROctetString, CLASS_CONTEXT | 0x03: BERSequence, }
[docs]class LDAPBindRequest(LDAPProtocolRequest, BERSequence): tag = CLASS_APPLICATION | 0x00
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPBindRequest( fallback=berdecoder)) sasl = False auth = None if isinstance(l[2], BEROctetString): auth = l[2].value elif isinstance(l[2], BERSequence): # per https://ldap.com/ldapv3-wire-protocol-reference-bind/ # Credentials are optional and not always provided if len(l[2].data) == 2: auth = (l[2][0].value, l[2][1].value) else: auth = (l[2][0].value, None) sasl = True r = klass(version=l[0].value, dn=l[1].value, auth=auth, tag=tag, sasl=sasl) return r
def __init__(self, version=None, dn=None, auth=None, tag=None, sasl=False): """Constructor for LDAP Bind Request For sasl=False, pass a string password for 'auth' For sasl=True, pass a tuple of (mechanism, credentials) for 'auth'""" LDAPProtocolRequest.__init__(self) BERSequence.__init__(self, [], tag=tag) self.version = version if self.version is None: self.version = 3 self.dn = dn if self.dn is None: self.dn = '' self.auth = auth if self.auth is None: self.auth = '' assert (not sasl) self.sasl = sasl
[docs] def toWire(self): if not self.sasl: auth_ber = BEROctetString(self.auth, tag=CLASS_CONTEXT | 0) else: # since the credentails for SASL is optional must check first # if credentials are None don't send them. if self.auth[1]: auth_ber = BERSequence([BEROctetString(self.auth[0]), BEROctetString(self.auth[1])], tag=CLASS_CONTEXT | 3) else: auth_ber = BERSequence([BEROctetString(self.auth[0])], tag=CLASS_CONTEXT | 3) return BERSequence([ BERInteger(self.version), BEROctetString(self.dn), auth_ber, ], tag=self.tag).toWire()
def __repr__(self): auth = '*' * len(self.auth) l = [] l.append('version=%d' % self.version) l.append('dn=%s' % repr(repr_converter(self.dn))) l.append('auth=%s' % repr(repr_converter(auth))) if self.tag != self.__class__.tag: l.append('tag=%d' % self.tag) l.append('sasl=%s' % repr(self.sasl)) return self.__class__.__name__ + '(' + ', '.join(l) + ')'
[docs]class LDAPReferral(BERSequence): tag = CLASS_CONTEXT | 0x03
[docs]class LDAPBERDecoderContext_LDAPSearchResultReference(BERDecoderContext): Identities = { BEROctetString.tag: LDAPString, }
[docs]class LDAPSearchResultReference(LDAPProtocolResponse, BERSequence): tag = CLASS_APPLICATION | 0x13 def __init__(self, uris=None, tag=None): LDAPProtocolResponse.__init__(self) BERSequence.__init__(self, value=[], tag=tag) assert uris is not None self.uris = uris
[docs] @classmethod def fromBER(cls, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPSearchResultReference( fallback=berdecoder)) r = cls(uris=l) return r
[docs] def toWire(self): return BERSequence(BERSequence(self.uris), tag=self.tag).toWire()
def __repr__(self): return '{}(uris={}{})'.format( self.__class__.__name__, repr([repr_converter(uri) for uri in self.uris]), ', tag={}'.format(self.tag) if self.tag != self.__class__.tag else '', )
[docs]class LDAPResult(LDAPProtocolResponse, BERSequence):
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPBindRequest( fallback=berdecoder)) assert 3 <= len(l) <= 4 referral = None # if (l[3:] and isinstance(l[3], LDAPReferral)): # TODO support referrals # self.referral=self.data[0] r = klass(resultCode=l[0].value, matchedDN=l[1].value, errorMessage=l[2].value, referral=referral, tag=tag) return r
def __init__(self, resultCode=None, matchedDN=None, errorMessage=None, referral=None, serverSaslCreds=None, tag=None): LDAPProtocolResponse.__init__(self) BERSequence.__init__(self, value=[], tag=tag) assert resultCode is not None self.resultCode = resultCode if matchedDN is None: matchedDN = '' self.matchedDN = matchedDN if errorMessage is None: errorMessage = '' self.errorMessage = errorMessage self.referral = referral self.serverSaslCreds = serverSaslCreds
[docs] def toWire(self): assert self.referral is None # TODO if self.serverSaslCreds: return BERSequence([ BEREnumerated(self.resultCode), BEROctetString(self.matchedDN), BEROctetString(self.errorMessage), LDAPBindResponse_serverSaslCreds(self.serverSaslCreds)], tag=self.tag).toWire() else: return BERSequence([ BEREnumerated(self.resultCode), BEROctetString(self.matchedDN), BEROctetString(self.errorMessage)], tag=self.tag).toWire()
def __repr__(self): l = [] l.append('resultCode=%r' % self.resultCode) if self.matchedDN: l.append('matchedDN=%r' % repr_converter(self.matchedDN)) if self.errorMessage: l.append('errorMessage=%r' % repr_converter(self.errorMessage)) if self.referral: l.append('referral=%r' % self.referral) if self.tag != self.__class__.tag: l.append('tag=%d' % self.tag) return self.__class__.__name__ + '(' + ', '.join(l) + ')'
[docs]class LDAPBindResponse_serverSaslCreds(BEROctetString): tag = CLASS_CONTEXT | 0x07 def __repr__(self): if self.tag == self.__class__.tag: return self.__class__.__name__ + "(value=%s)" \ % self.value else: return self.__class__.__name__ \ + "(value=%s, tag=%d)" \ % (self.value, self.tag)
[docs]class LDAPBERDecoderContext_BindResponse(BERDecoderContext): Identities = { LDAPBindResponse_serverSaslCreds.tag: LDAPBindResponse_serverSaslCreds, }
[docs]class LDAPBindResponse(LDAPResult): tag = CLASS_APPLICATION | 0x01 resultCode = None matchedDN = None errorMessage = None referral = None serverSaslCreds = None
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_BindResponse( fallback=berdecoder)) assert 3 <= len(l) <= 4 try: if isinstance(l[3], LDAPBindResponse_serverSaslCreds): serverSaslCreds = l[3].value else: serverSaslCreds = None except IndexError: serverSaslCreds = None referral = None #if (l[3:] and isinstance(l[3], LDAPReferral)): #TODO support referrals #self.referral=self.data[0] r = klass(resultCode=l[0].value, matchedDN=l[1].value, errorMessage=l[2].value, referral=referral, serverSaslCreds=serverSaslCreds, tag=tag) return r
def __init__(self, resultCode=None, matchedDN=None, errorMessage=None, referral=None, serverSaslCreds=None, tag=None): LDAPResult.__init__(self, resultCode=resultCode, matchedDN=matchedDN, errorMessage=errorMessage, referral=referral, serverSaslCreds=serverSaslCreds, tag=None) def __repr__(self): return LDAPResult.__repr__(self)
[docs]class LDAPUnbindRequest(LDAPProtocolRequest, BERNull): tag = CLASS_APPLICATION | 0x02 needs_answer = 0 def __init__(self, *args, **kwargs): LDAPProtocolRequest.__init__(self) BERNull.__init__(self, *args, **kwargs)
[docs] def toWire(self): return BERNull.toWire(self)
[docs]class LDAPAttributeDescription(BEROctetString): pass
[docs]class LDAPAttributeValueAssertion(BERSequence):
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, berdecoder) assert len(l) == 2 r = klass(attributeDesc=l[0], assertionValue=l[1], tag=tag) return r
def __init__(self, attributeDesc=None, assertionValue=None, tag=None, escaper=escape): BERSequence.__init__(self, value=[], tag=tag) assert attributeDesc is not None self.attributeDesc = attributeDesc self.assertionValue = assertionValue self.escaper = escaper
[docs] def toWire(self): return BERSequence([self.attributeDesc, self.assertionValue], tag=self.tag).toWire()
def __repr__(self): if self.tag == self.__class__.tag: return self.__class__.__name__ + "(attributeDesc=%s, assertionValue=%s)" \ % (repr(self.attributeDesc), repr(self.assertionValue)) else: return self.__class__.__name__ + "(attributeDesc=%s, assertionValue=%s, tag=%d)" \ % (repr(self.attributeDesc), repr(self.assertionValue), self.tag)
[docs]class LDAPFilter(BERStructured): def __init__(self, tag=None): BERStructured.__init__(self, tag=tag)
[docs]class LDAPFilterSet(BERSet):
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_Filter(fallback=berdecoder)) r = klass(l, tag=tag) return r
def __eq__(self, rhs): # Fast paths if self is rhs: return True elif len(self) != len(rhs): return False return sorted(self, key=lambda x: x.toWire()) == sorted(rhs, key=lambda x: x.toWire())
[docs]class LDAPFilter_and(LDAPFilterSet): tag = CLASS_CONTEXT | 0x00
[docs] def asText(self): return '(&' + ''.join([x.asText() for x in self]) + ')'
[docs]class LDAPFilter_or(LDAPFilterSet): tag = CLASS_CONTEXT | 0x01
[docs] def asText(self): return '(|' + ''.join([x.asText() for x in self]) + ')'
[docs]class LDAPFilter_not(LDAPFilter): tag = CLASS_CONTEXT | 0x02
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): value, bytes = berDecodeObject(LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder), content) assert bytes == len(content) r = klass(value=value, tag=tag) return r
def __init__(self, value, tag=tag): LDAPFilter.__init__(self, tag=tag) assert value is not None self.value = value def __repr__(self): if self.tag == self.__class__.tag: return self.__class__.__name__ \ + "(value=%s)" \ % repr(self.value) else: return self.__class__.__name__ \ + "(value=%s, tag=%d)" \ % (repr(self.value), self.tag)
[docs] def toWire(self): value = to_bytes(self.value) return bytes((self.identification(),)) + int2berlen(len(value)) + value
[docs] def asText(self): return '(!' + self.value.asText() + ')'
[docs]class LDAPFilter_equalityMatch(LDAPAttributeValueAssertion): tag = CLASS_CONTEXT | 0x03
[docs] def asText(self): return '('+self.attributeDesc.value+'=' \ +self.escaper(self.assertionValue.value)+')'
[docs]class LDAPFilter_substrings_initial(LDAPString): tag = CLASS_CONTEXT | 0x00
[docs] def asText(self): return self.escaper(self.value)
[docs]class LDAPFilter_substrings_any(LDAPString): tag = CLASS_CONTEXT | 0x01
[docs] def asText(self): return self.escaper(self.value)
[docs]class LDAPFilter_substrings_final(LDAPString): tag = CLASS_CONTEXT | 0x02
[docs] def asText(self): return self.escaper(self.value)
[docs]class LDAPBERDecoderContext_Filter_substrings(BERDecoderContext): Identities = { LDAPFilter_substrings_initial.tag: LDAPFilter_substrings_initial, LDAPFilter_substrings_any.tag: LDAPFilter_substrings_any, LDAPFilter_substrings_final.tag: LDAPFilter_substrings_final, }
[docs]class LDAPFilter_substrings(BERSequence): tag = CLASS_CONTEXT | 0x04
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_Filter_substrings(fallback=berdecoder)) assert len(l) == 2 assert len(l[1]) >= 1 r = klass(type=l[0].value, substrings=list(l[1]), tag=tag) return r
def __init__(self, type=None, substrings=None, tag=None): BERSequence.__init__(self, value=[], tag=tag) assert type is not None assert substrings is not None self.type = type self.substrings = substrings
[docs] def toWire(self): return BERSequence([ LDAPString(self.type), BERSequence(self.substrings)], tag=self.tag).toWire()
def __repr__(self): tp = repr_converter(self.type) if self.tag==self.__class__.tag: return self.__class__.__name__\ +"(type=%s, substrings=%s)"\ %(repr(tp), repr(self.substrings)) else: return self.__class__.__name__\ +"(type=%s, substrings=%s, tag=%d)"\ %(repr(tp), repr(self.substrings), self.tag)
[docs] def asText(self): initial = None final = None any = [] for s in self.substrings: assert s is not None if isinstance(s, LDAPFilter_substrings_initial): assert initial is None assert not any assert final is None initial = s.asText() elif isinstance(s, LDAPFilter_substrings_final): assert final is None final = s.asText() elif isinstance(s, LDAPFilter_substrings_any): assert final is None any.append(s.asText()) else: raise NotImplementedError('TODO: Filter type not supported %r' % s) if initial is None: initial = '' if final is None: final = '' return '(' + self.type + '=' \ + '*'.join([initial] + any + [final]) + ')'
[docs]class LDAPFilter_greaterOrEqual(LDAPAttributeValueAssertion): tag = CLASS_CONTEXT | 0x05
[docs] def asText(self): return '(' + self.attributeDesc.value + '>=' + \ self.escaper(self.assertionValue.value) + ')'
[docs]class LDAPFilter_lessOrEqual(LDAPAttributeValueAssertion): tag = CLASS_CONTEXT | 0x06
[docs] def asText(self): return '(' + self.attributeDesc.value + '<=' + \ self.escaper(self.assertionValue.value) + ')'
[docs]class LDAPFilter_present(LDAPAttributeDescription): tag = CLASS_CONTEXT | 0x07
[docs] def asText(self): return '(%s=*)' % self.value
[docs]class LDAPFilter_approxMatch(LDAPAttributeValueAssertion): tag = CLASS_CONTEXT | 0x08
[docs] def asText(self): return '(' + self.attributeDesc.value + '~=' + \ self.escaper(self.assertionValue.value) + ')'
[docs]class LDAPMatchingRuleId(LDAPString): pass
[docs]class LDAPAssertionValue(BEROctetString): pass
[docs]class LDAPMatchingRuleAssertion_matchingRule(LDAPMatchingRuleId): tag = CLASS_CONTEXT | 0x01
[docs]class LDAPMatchingRuleAssertion_type(LDAPAttributeDescription): tag = CLASS_CONTEXT | 0x02
[docs]class LDAPMatchingRuleAssertion_matchValue(LDAPAssertionValue): tag = CLASS_CONTEXT | 0x03
[docs]class LDAPMatchingRuleAssertion_dnAttributes(BERBoolean): tag = CLASS_CONTEXT | 0x04
[docs]class LDAPBERDecoderContext_MatchingRuleAssertion(BERDecoderContext): Identities = { LDAPMatchingRuleAssertion_matchingRule.tag: LDAPMatchingRuleAssertion_matchingRule, LDAPMatchingRuleAssertion_type.tag: LDAPMatchingRuleAssertion_type, LDAPMatchingRuleAssertion_matchValue.tag: LDAPMatchingRuleAssertion_matchValue, LDAPMatchingRuleAssertion_dnAttributes.tag: LDAPMatchingRuleAssertion_dnAttributes, }
[docs]class LDAPMatchingRuleAssertion(BERSequence): matchingRule = None type = None matchValue = None dnAttributes = None
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): matchingRule = None atype = None matchValue = None dnAttributes = None l = berDecodeMultiple(content, LDAPBERDecoderContext_MatchingRuleAssertion(fallback=berdecoder, inherit=berdecoder)) assert 1 <= len(l) <= 4 if isinstance(l[0], LDAPMatchingRuleAssertion_matchingRule): matchingRule = l[0] del l[0] if len(l) >= 1 and isinstance(l[0], LDAPMatchingRuleAssertion_type): atype = l[0] del l[0] if len(l) >= 1 and isinstance(l[0], LDAPMatchingRuleAssertion_matchValue): matchValue = l[0] del l[0] if len(l) >= 1 and isinstance(l[0], LDAPMatchingRuleAssertion_dnAttributes): dnAttributes = l[0] del l[0] assert matchValue if not dnAttributes: dnAttributes = None r = klass( matchingRule=matchingRule, type=atype, matchValue=matchValue, dnAttributes=dnAttributes, tag=tag) return r
def __init__(self, matchingRule=None, type=None, matchValue=None, dnAttributes=None, tag=None, escaper=escape): BERSequence.__init__(self, value=[], tag=tag) assert matchValue is not None if isinstance(matchingRule, (bytes, str)): matchingRule = LDAPMatchingRuleAssertion_matchingRule(matchingRule) if isinstance(type, (bytes, str)): type = LDAPMatchingRuleAssertion_type(type) if isinstance(matchValue, (bytes, str)): matchValue = LDAPMatchingRuleAssertion_matchValue(matchValue) if isinstance(dnAttributes, bool): dnAttributes = LDAPMatchingRuleAssertion_dnAttributes(dnAttributes) self.matchingRule = matchingRule self.type = type self.matchValue = matchValue self.dnAttributes = dnAttributes if not self.dnAttributes: self.dnAttributes = None self.escaper = escaper
[docs] def toWire(self): return BERSequence( filter(lambda x: x is not None, [self.matchingRule, self.type, self.matchValue, self.dnAttributes]), tag=self.tag).toWire()
def __repr__(self): l=[] l.append('matchingRule=%s' % repr(self.matchingRule)) l.append('type=%s' % repr(self.type)) l.append('matchValue=%s' % repr(self.matchValue)) l.append('dnAttributes=%s' % repr(self.dnAttributes)) if self.tag != self.__class__.tag: l.append('tag=%d' % self.tag) return self.__class__.__name__ + '(' + ', '.join(l) + ')'
[docs]class LDAPFilter_extensibleMatch(LDAPMatchingRuleAssertion): tag = CLASS_CONTEXT | 0x09
[docs] def asText(self): return '(' + \ (self.type.value if self.type else '') + \ (':dn' if self.dnAttributes and self.dnAttributes.value else '') + \ ((':' + self.matchingRule.value) if self.matchingRule else '') + \ ':=' + \ self.escaper(self.matchValue.value) + \ ')'
[docs]class LDAPBERDecoderContext_Filter(BERDecoderContext): Identities = { LDAPFilter_and.tag: LDAPFilter_and, LDAPFilter_or.tag: LDAPFilter_or, LDAPFilter_not.tag: LDAPFilter_not, LDAPFilter_equalityMatch.tag: LDAPFilter_equalityMatch, LDAPFilter_substrings.tag: LDAPFilter_substrings, LDAPFilter_greaterOrEqual.tag: LDAPFilter_greaterOrEqual, LDAPFilter_lessOrEqual.tag: LDAPFilter_lessOrEqual, LDAPFilter_present.tag: LDAPFilter_present, LDAPFilter_approxMatch.tag: LDAPFilter_approxMatch, LDAPFilter_extensibleMatch.tag: LDAPFilter_extensibleMatch, }
LDAP_SCOPE_baseObject = 0 LDAP_SCOPE_singleLevel = 1 LDAP_SCOPE_wholeSubtree = 2 LDAP_DEREF_neverDerefAliases = 0 LDAP_DEREF_derefInSearching = 1 LDAP_DEREF_derefFindingBaseObj = 2 LDAP_DEREF_derefAlways = 3 LDAPFilterMatchAll = LDAPFilter_present('objectClass')
[docs]class LDAPSearchRequest(LDAPProtocolRequest, BERSequence): tag = CLASS_APPLICATION | 0x03 baseObject = '' scope = LDAP_SCOPE_wholeSubtree derefAliases = LDAP_DEREF_neverDerefAliases sizeLimit = 0 timeLimit = 0 typesOnly = 0 filter = LDAPFilterMatchAll attributes = [] # TODO AttributeDescriptionList # TODO decode
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder)) assert 8 <= len(l) <= 8 r = klass(baseObject=l[0].value, scope=l[1].value, derefAliases=l[2].value, sizeLimit=l[3].value, timeLimit=l[4].value, typesOnly=l[5].value, filter=l[6], attributes=[x.value for x in l[7]], tag=tag) return r
def __init__(self, baseObject=None, scope=None, derefAliases=None, sizeLimit=None, timeLimit=None, typesOnly=None, filter=None, attributes=None, tag=None): LDAPProtocolRequest.__init__(self) BERSequence.__init__(self, [], tag=tag) if baseObject is not None: self.baseObject = baseObject if scope is not None: self.scope = scope if derefAliases is not None: self.derefAliases = derefAliases if sizeLimit is not None: self.sizeLimit = sizeLimit if timeLimit is not None: self.timeLimit = timeLimit if typesOnly is not None: self.typesOnly = typesOnly if filter is not None: self.filter = filter if attributes is not None: self.attributes = attributes
[docs] def toWire(self): return BERSequence([ BEROctetString(self.baseObject), BEREnumerated(self.scope), BEREnumerated(self.derefAliases), BERInteger(self.sizeLimit), BERInteger(self.timeLimit), BERBoolean(self.typesOnly), self.filter, BERSequenceOf(map(BEROctetString, self.attributes)), ], tag=self.tag).toWire()
def __repr__(self): base = repr_converter(self.baseObject) if self.tag == self.__class__.tag: return self.__class__.__name__ \ + ("(baseObject=%s, scope=%s, derefAliases=%s, " \ + "sizeLimit=%s, timeLimit=%s, typesOnly=%s, " \ "filter=%s, attributes=%s)") \ % (repr(base), self.scope, self.derefAliases, self.sizeLimit, self.timeLimit, self.typesOnly, repr(self.filter), self.attributes) else: return self.__class__.__name__ \ + ("(baseObject=%s, scope=%s, derefAliases=%s, " \ + "sizeLimit=%s, timeLimit=%s, typesOnly=%s, " \ "filter=%s, attributes=%s, tag=%d)") \ % (repr(base), self.scope, self.derefAliases, self.sizeLimit, self.timeLimit, self.typesOnly, repr(self.filter), self.attributes, self.tag)
[docs]class LDAPSearchResultEntry(LDAPProtocolResponse, BERSequence): tag = CLASS_APPLICATION | 0x04
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder)) objectName = l[0].value attributes = [] for attr, li in l[1].data: attributes.append((attr.value, [x.value for x in li])) r = klass(objectName=objectName, attributes=attributes, tag=tag) return r
def __init__(self, objectName, attributes, tag=None): LDAPProtocolResponse.__init__(self) BERSequence.__init__(self, [], tag=tag) assert objectName is not None assert attributes is not None self.objectName = objectName self.attributes = attributes
[docs] def toWire(self): return BERSequence([ BEROctetString(self.objectName), BERSequence([ BERSequence([ BEROctetString(attr_li[0]), BERSet([BEROctetString(x) for x in attr_li[1]])]) for attr_li in self.attributes ]), ], tag=self.tag).toWire()
def __repr__(self): name = repr_converter(self.objectName) attributes = [(repr_converter(key), [repr_converter(v) for v in value]) for (key, value) in self.attributes] return '{}(objectName={}, attributes={}{})'.format( self.__class__.__name__, repr(name), repr(attributes), ', tag={}'.format(self.tag) if self.tag != self.__class__.tag else '', )
[docs]class LDAPSearchResultDone(LDAPResult): tag = CLASS_APPLICATION | 0x05
[docs]class LDAPControls(BERSequence): tag = CLASS_CONTEXT | 0x00
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPControls( inherit=berdecoder)) r = klass(l, tag=tag) return r
[docs]class LDAPControl(BERSequence): criticality = None controlValue = None
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, berdecoder) assert 1<=len(l)<= 3 kw = {} if len(l) == 2: if isinstance(l[1], BERBoolean): kw['criticality'] = l[1].value elif isinstance(l[1], BEROctetString): kw['controlValue'] = l[1].value elif len(l) == 3: kw['criticality'] = l[1].value kw['controlValue'] = l[2].value r = klass(controlType=l[0].value, tag=tag, **kw) return r
def __init__(self, controlType, criticality=None, controlValue=None, id=None, tag=None): BERSequence.__init__(self, value=[], tag=tag) assert controlType is not None self.controlType = controlType self.criticality = criticality self.controlValue = controlValue
[docs] def toWire(self): self.data = [LDAPOID(self.controlType)] if self.criticality is not None: self.data.append(BERBoolean(self.criticality)) if self.controlValue is not None: self.data.append(BEROctetString(self.controlValue)) return BERSequence.toWire(self)
[docs]class LDAPBERDecoderContext_LDAPControls(BERDecoderContext): Identities = { LDAPControl.tag: LDAPControl, }
[docs]class LDAPBERDecoderContext_LDAPMessage(BERDecoderContext): Identities = { LDAPControls.tag: LDAPControls, LDAPSearchResultReference.tag: LDAPSearchResultReference, }
[docs]class LDAPBERDecoderContext_TopLevel(BERDecoderContext): Identities = { BERSequence.tag: LDAPMessage, }
[docs]class LDAPModifyRequest(LDAPProtocolRequest, BERSequence): tag = CLASS_APPLICATION | 0x06 object = None modification = None
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, berdecoder) assert len(l) == 2 r = klass(object=l[0].value, modification=l[1].data, tag=tag) return r
def __init__(self, object=None, modification=None, tag=None): """ Initialize the object Example usage:: l = LDAPModifyRequest( object='cn=foo,dc=example,dc=com', modification=[ BERSequence([ BEREnumerated(0), BERSequence([ LDAPAttributeDescription('attr1'), BERSet([ LDAPString('value1'), LDAPString('value2'), ]), ]), ]), BERSequence([ BEREnumerated(1), BERSequence([ LDAPAttributeDescription('attr2'), ]), ]), ]) But more likely you just want to say:: mod = delta.ModifyOp('cn=foo,dc=example,dc=com', [delta.Add('attr1', ['value1', 'value2']), delta.Delete('attr1', ['value1', 'value2'])]) l = mod.asLDAP() """ LDAPProtocolRequest.__init__(self) BERSequence.__init__(self, [], tag=tag) self.object = object self.modification = modification
[docs] def toWire(self): l = [LDAPString(self.object)] if self.modification is not None: l.append(BERSequence(self.modification)) return BERSequence(l, tag=self.tag).toWire()
def __repr__(self): name = repr_converter(self.object) if self.tag==self.__class__.tag: return self.__class__.__name__+"(object=%s, modification=%s)"\ %(repr(name), repr(self.modification)) else: return self.__class__.__name__+"(object=%s, modification=%s, tag=%d)" \ %(repr(name), repr(self.modification), self.tag)
[docs]class LDAPModifyResponse(LDAPResult): tag = CLASS_APPLICATION | 0x07
[docs]class LDAPAddRequest(LDAPProtocolRequest, BERSequence): tag = CLASS_APPLICATION | 0x08
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, berdecoder) r = klass(entry=l[0].value, attributes=l[1], tag=tag) return r
def __init__(self, entry=None, attributes=None, tag=None): """ Initialize the object Example usage:: l=LDAPAddRequest(entry='cn=foo,dc=example,dc=com', attributes=[(LDAPAttributeDescription("attrFoo"), BERSet(value=( LDAPAttributeValue("value1"), LDAPAttributeValue("value2"), ))), (LDAPAttributeDescription("attrBar"), BERSet(value=( LDAPAttributeValue("value1"), LDAPAttributeValue("value2"), ))), ]) """ LDAPProtocolRequest.__init__(self) BERSequence.__init__(self, [], tag=tag) self.entry = entry self.attributes = attributes
[docs] def toWire(self): return BERSequence([ LDAPString(self.entry), BERSequence(map(BERSequence, self.attributes)), ], tag=self.tag).toWire()
def __repr__(self): entry = repr_converter(self.entry) if self.tag==self.__class__.tag: return self.__class__.__name__+"(entry=%s, attributes=%s)"\ %(repr(entry), repr(self.attributes)) else: return self.__class__.__name__+"(entry=%s, attributes=%s, tag=%d)" \ %(repr(entry), repr(self.attributes), self.tag)
[docs]class LDAPAddResponse(LDAPResult): tag = CLASS_APPLICATION | 0x09
[docs]class LDAPDelRequest(LDAPProtocolRequest, LDAPString): tag = CLASS_APPLICATION | 0x0a def __init__(self, value=None, entry=None, tag=None): """ Initialize the object l=LDAPDelRequest(entry='cn=foo,dc=example,dc=com') """ if entry is None and value is not None: entry = value LDAPProtocolRequest.__init__(self) LDAPString.__init__(self, value=entry, tag=tag)
[docs] def toWire(self): return LDAPString.toWire(self)
def __repr__(self): entry = repr_converter(self.value) if self.tag == self.__class__.tag: return self.__class__.__name__ + "(entry=%s)" \ % repr(entry) else: return self.__class__.__name__ \ + "(entry=%s, tag=%d)" \ % (repr(entry), self.tag)
[docs]class LDAPDelResponse(LDAPResult): tag = CLASS_APPLICATION | 0x0b
[docs]class LDAPModifyDNResponse_newSuperior(LDAPString): tag = CLASS_CONTEXT | 0x00
[docs]class LDAPBERDecoderContext_ModifyDNRequest(BERDecoderContext): Identities = { LDAPModifyDNResponse_newSuperior.tag: LDAPModifyDNResponse_newSuperior, }
[docs]class LDAPModifyDNRequest(LDAPProtocolRequest, BERSequence): tag = CLASS_APPLICATION | 12 entry = None newrdn = None deleteoldrdn = None newSuperior = None
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_ModifyDNRequest(fallback=berdecoder)) kw = {} try: kw['newSuperior'] = to_bytes(l[3].value) except IndexError: pass r = klass(entry=to_bytes(l[0].value), newrdn=to_bytes(l[1].value), deleteoldrdn=l[2].value, tag=tag, **kw) return r
def __init__(self, entry, newrdn, deleteoldrdn, newSuperior=None, tag=None): """ Initialize the object Example usage:: l=LDAPModifyDNRequest(entry='cn=foo,dc=example,dc=com', newrdn='someAttr=value', deleteoldrdn=0) """ LDAPProtocolRequest.__init__(self) BERSequence.__init__(self, [], tag=tag) assert entry is not None assert newrdn is not None assert deleteoldrdn is not None self.entry = entry self.newrdn = newrdn self.deleteoldrdn = deleteoldrdn self.newSuperior = newSuperior
[docs] def toWire(self): l = [ LDAPString(self.entry), LDAPString(self.newrdn), BERBoolean(self.deleteoldrdn), ] if self.newSuperior is not None: l.append(LDAPString(self.newSuperior, tag=CLASS_CONTEXT | 0)) return BERSequence(l, tag=self.tag).toWire()
def __repr__(self): l = [ "entry=%s" % repr(repr_converter(self.entry)), "newrdn=%s" % repr(repr_converter(self.newrdn)), "deleteoldrdn=%s" % repr(self.deleteoldrdn), ] if self.newSuperior is not None: l.append("newSuperior=%s" % repr(repr_converter(self.newSuperior))) if self.tag != self.__class__.tag: l.append("tag=%d" % self.tag) return self.__class__.__name__ + "(" + ', '.join(l) + ")"
[docs]class LDAPModifyDNResponse(LDAPResult): tag = CLASS_APPLICATION | 13
[docs]class LDAPBERDecoderContext_Compare(BERDecoderContext): Identities = { BERSequence.tag: LDAPAttributeValueAssertion }
[docs]class LDAPCompareRequest(LDAPProtocolRequest, BERSequence): tag = CLASS_APPLICATION | 14 entry = None ava = None
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple( content, LDAPBERDecoderContext_Compare( fallback=berdecoder, inherit=berdecoder ) ) r = klass(entry=l[0].value, ava=l[1], tag=tag) return r
def __init__(self, entry, ava, tag=None): LDAPProtocolRequest.__init__(self) BERSequence.__init__(self, [], tag=tag) assert entry is not None assert ava is not None self.entry = entry self.ava = ava
[docs] def toWire(self): l = [LDAPString(self.entry), self.ava] return BERSequence(l, tag=self.tag).toWire()
def __repr__(self): l = ["entry={}".format(repr(repr_converter(self.entry))), "ava={}".format(repr(self.ava))] return "{}({})".format(self.__class__.__name__, ', '.join(l))
[docs]class LDAPCompareResponse(LDAPResult): tag = CLASS_APPLICATION | 15
[docs]class LDAPAbandonRequest(LDAPProtocolRequest, LDAPInteger): tag = CLASS_APPLICATION | 0x10 needs_answer = 0 def __init__(self, value=None, id=None, tag=None): """ Initialize the object l=LDAPAbandonRequest(id=1) """ if id is None and value is not None: id = value LDAPProtocolRequest.__init__(self) LDAPInteger.__init__(self, value=id, tag=tag)
[docs] def toWire(self): return LDAPInteger.toWire(self)
def __repr__(self): if self.tag==self.__class__.tag: return self.__class__.__name__+"(id=%s)" \ %repr(self.value) else: return self.__class__.__name__ \ + "(id=%s, tag=%d)" \ % (repr(self.value), self.tag)
[docs]class LDAPOID(BEROctetString): pass
[docs]class LDAPResponseName(LDAPOID): tag = CLASS_CONTEXT | 10
[docs]class LDAPResponse(BEROctetString): tag = CLASS_CONTEXT | 11
[docs]class LDAPBERDecoderContext_LDAPExtendedRequest(BERDecoderContext): Identities = { CLASS_CONTEXT | 0x00: BEROctetString, CLASS_CONTEXT | 0x01: BEROctetString, }
[docs]class LDAPExtendedRequest(LDAPProtocolRequest, BERSequence): tag = CLASS_APPLICATION | 23 requestName = None requestValue = None
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPExtendedRequest( fallback=berdecoder)) kw = {} try: kw['requestValue'] = l[1].value except IndexError: pass r = klass(requestName=l[0].value, tag=tag, **kw) return r
def __init__(self, requestName=None, requestValue=None, tag=None): LDAPProtocolRequest.__init__(self) BERSequence.__init__(self, [], tag=tag) assert requestName is not None assert isinstance(requestName, (bytes, str)) assert requestValue is None or isinstance( requestValue, (bytes, str)) self.requestName = requestName self.requestValue = requestValue
[docs] def toWire(self): l = [LDAPOID(self.requestName, tag=CLASS_CONTEXT | 0)] if self.requestValue is not None: value = to_bytes(self.requestValue) l.append(BEROctetString(value, tag=CLASS_CONTEXT | 1)) return BERSequence(l, tag=self.tag).toWire()
[docs]class LDAPPasswordModifyRequest_userIdentity(BEROctetString): tag = CLASS_CONTEXT | 0
[docs]class LDAPPasswordModifyRequest_passwd(BEROctetString): def __repr__(self): value = '*' * len(self.value) return '{}(value={}{})'.format( self.__class__.__name__, repr(value), ', tag={}'.format(self.tag) if self.tag != self.__class__.tag else '', )
[docs]class LDAPPasswordModifyRequest_oldPasswd(LDAPPasswordModifyRequest_passwd): tag = CLASS_CONTEXT | 1
[docs]class LDAPPasswordModifyRequest_newPasswd(LDAPPasswordModifyRequest_passwd): tag = CLASS_CONTEXT | 2
[docs]class LDAPBERDecoderContext_LDAPPasswordModifyRequest(BERDecoderContext): Identities = { LDAPPasswordModifyRequest_userIdentity.tag: LDAPPasswordModifyRequest_userIdentity, LDAPPasswordModifyRequest_oldPasswd.tag: LDAPPasswordModifyRequest_oldPasswd, LDAPPasswordModifyRequest_newPasswd.tag: LDAPPasswordModifyRequest_newPasswd, }
[docs]class LDAPPasswordModifyRequest(LDAPExtendedRequest): oid = b'1.3.6.1.4.1.4203.1.11.1' def __init__(self, requestName=None, userIdentity=None, oldPasswd=None, newPasswd=None, tag=None): assert (requestName is None or requestName == self.oid), \ '%s requestName was %s instead of %s' \ % (self.__class__.__name__, requestName, self.oid) #TODO genPasswd l = [] self.userIdentity = None if userIdentity is not None: self.userIdentity = LDAPPasswordModifyRequest_userIdentity(userIdentity) l.append(self.userIdentity) self.oldPasswd = None if oldPasswd is not None: self.oldPasswd = LDAPPasswordModifyRequest_oldPasswd(oldPasswd) l.append(self.oldPasswd) self.newPasswd = None if newPasswd is not None: self.newPasswd = LDAPPasswordModifyRequest_newPasswd(newPasswd) l.append(self.newPasswd) LDAPExtendedRequest.__init__( self, requestName=self.oid, requestValue=BERSequence(l).toWire(), tag=tag) def __repr__(self): l = [] if self.userIdentity is not None: l.append('userIdentity={}'.format(repr(self.userIdentity))) if self.oldPasswd is not None: l.append('oldPasswd={}'.format(repr(self.oldPasswd))) if self.newPasswd is not None: l.append('newPasswd={}'.format(repr(self.newPasswd))) if self.tag != self.__class__.tag: l.append('tag=%d' % self.tag) return self.__class__.__name__ + '(' + ', '.join(l) + ')'
[docs]class LDAPBERDecoderContext_LDAPExtendedResponse(BERDecoderContext): Identities = { LDAPResponseName.tag: LDAPResponseName, LDAPResponse.tag: LDAPResponse, }
[docs]class LDAPExtendedResponse(LDAPResult): tag = CLASS_APPLICATION | 0x18 responseName = None response = None
[docs] @classmethod def fromBER(klass, tag, content, berdecoder=None): l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPExtendedResponse( fallback=berdecoder)) assert 3 <= len(l) <= 6 referral = None responseName = None response = None for obj in l[3:]: if isinstance(obj, LDAPResponseName): responseName = obj.value elif isinstance(obj, LDAPResponse): response = obj.value elif isinstance(obj, LDAPReferral): #TODO support referrals #self.referral=self.data[0] pass else: assert False r = klass(resultCode=l[0].value, matchedDN=l[1].value, errorMessage=l[2].value, referral=referral, responseName=responseName, response=response, tag=tag) return r
def __init__(self, resultCode=None, matchedDN=None, errorMessage=None, referral=None, serverSaslCreds=None, responseName=None, response=None, tag=None): LDAPResult.__init__(self, resultCode=resultCode, matchedDN=matchedDN, errorMessage=errorMessage, referral=referral, serverSaslCreds=serverSaslCreds, tag=tag) self.responseName = responseName self.response = response
[docs] def toWire(self): assert self.referral is None # TODO l = [BEREnumerated(self.resultCode), BEROctetString(self.matchedDN), BEROctetString(self.errorMessage), # TODO referral [3] Referral OPTIONAL ] if self.responseName is not None: l.append(LDAPOID(self.responseName, tag=CLASS_CONTEXT | 0x0a)) if self.response is not None: l.append(BEROctetString(self.response, tag=CLASS_CONTEXT | 0x0b)) return BERSequence(l, tag=self.tag).toWire()
[docs]class LDAPStartTLSRequest(LDAPExtendedRequest): """ Request to start Transport Layer Security. See RFC 2830 for details. """ oid = b'1.3.6.1.4.1.1466.20037' def __init__(self, requestName=None, tag=None): assert (requestName is None or requestName == self.oid), \ '%s requestName was %s instead of %s' \ % (self.__class__.__name__, requestName, self.oid) LDAPExtendedRequest.__init__( self, requestName=self.oid, tag=tag) def __repr__(self): l = [] if self.tag != self.__class__.tag: l.append('tag={}'.format(self.tag)) return self.__class__.__name__ + '(' + ', '.join(l) + ')'
[docs]class LDAPStartTLSResponse(LDAPExtendedResponse): """ Response to start Transport Layer Security. See RFC 4511 section 4.14.2 for details. """ oid = b'1.3.6.1.4.1.1466.20037' def __init__(self, resultCode=None, matchedDN=None, errorMessage=None, referral=None, serverSaslCreds=None, responseName=None, response=None, tag=None): LDAPExtendedResponse.__init__(self, resultCode=resultCode, matchedDN=matchedDN, errorMessage=errorMessage, referral=referral, serverSaslCreds=serverSaslCreds, responseName=responseName, response=response, tag=tag) def __repr__(self): l = [] if self.tag != self.__class__.tag: l.append('tag={}'.format(self.tag)) return self.__class__.__name__ + '(' + ', '.join(l) + ')'
[docs]class LDAPBERDecoderContext(BERDecoderContext): Identities = { LDAPBindResponse.tag: LDAPBindResponse, LDAPBindRequest.tag: LDAPBindRequest, LDAPUnbindRequest.tag: LDAPUnbindRequest, LDAPSearchRequest.tag: LDAPSearchRequest, LDAPSearchResultEntry.tag: LDAPSearchResultEntry, LDAPSearchResultDone.tag: LDAPSearchResultDone, LDAPSearchResultReference.tag: LDAPSearchResultReference, LDAPReferral.tag: LDAPReferral, LDAPModifyRequest.tag: LDAPModifyRequest, LDAPModifyResponse.tag: LDAPModifyResponse, LDAPAddRequest.tag: LDAPAddRequest, LDAPAddResponse.tag: LDAPAddResponse, LDAPDelRequest.tag: LDAPDelRequest, LDAPDelResponse.tag: LDAPDelResponse, LDAPExtendedRequest.tag: LDAPExtendedRequest, LDAPExtendedResponse.tag: LDAPExtendedResponse, LDAPModifyDNRequest.tag: LDAPModifyDNRequest, LDAPModifyDNResponse.tag: LDAPModifyDNResponse, LDAPAbandonRequest.tag: LDAPAbandonRequest, LDAPCompareRequest.tag: LDAPCompareRequest, LDAPCompareResponse.tag: LDAPCompareResponse }