"""LDAP protocol message conversion; no application logic here."""
import string
from ldaptor.protocols.pureber import (
BERBoolean,
BERDecoderContext,
BEREnumerated,
BERInteger,
BERNull,
BEROctetString,
BERSequence,
BERSequenceOf,
BERSet,
BERStructured,
CLASS_APPLICATION,
CLASS_CONTEXT,
berDecodeMultiple,
berDecodeObject,
int2berlen,
)
from ldaptor._encoder import to_bytes
next_ldap_message_id = 1
[docs]def alloc_ldap_message_id():
global next_ldap_message_id
r = next_ldap_message_id
next_ldap_message_id = next_ldap_message_id + 1
return r
[docs]def escape(s):
s = s.replace("\\", r"\5c")
s = s.replace("*", r"\2a")
s = s.replace("(", r"\28")
s = s.replace(")", r"\29")
s = s.replace("\0", r"\00")
return s
[docs]def binary_escape(s):
return "".join(f"\\{ord(c):02x}" for c in s)
[docs]def smart_escape(s, threshold=0.30):
binary_count = sum(c not in string.printable for c in s)
if float(binary_count) / float(len(s)) > threshold:
return binary_escape(s)
return escape(s)
[docs]class LDAPInteger(BERInteger):
pass
[docs]class LDAPString(BEROctetString):
def __init__(self, *args, **kwargs):
self.escaper = kwargs.pop("escaper", escape)
super().__init__(*args, **kwargs)
[docs]class LDAPAttributeValue(BEROctetString):
pass
[docs]class LDAPMessage(BERSequence):
"""
To encode this object in order to be sent over the network use the toWire()
method.
"""
id = None
value = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
id_ = l[0].value
value = l[1]
if l[2:]:
controls = []
for c in l[2]:
controls.append(
(
c.controlType,
c.criticality,
c.controlValue,
)
)
else:
controls = None
assert not l[3:]
r = klass(id=id_, value=value, controls=controls, tag=tag)
return r
def __init__(self, value=None, controls=None, id=None, tag=None):
BERSequence.__init__(self, value=[], tag=tag)
assert value is not None
self.id = id
if self.id is None:
self.id = alloc_ldap_message_id()
self.value = value
self.controls = controls
[docs] def toWire(self):
"""
This is the wire/encoded representation.
"""
l = [BERInteger(self.id), self.value]
if self.controls is not None:
l.append(LDAPControls([LDAPControl(*a) for a in self.controls]))
return BERSequence(l).toWire()
def __repr__(self):
l = []
l.append("id=%r" % self.id)
l.append("value=%r" % self.value)
l.append("controls=%r" % self.controls)
if self.tag != self.__class__.tag:
l.append("tag=%d" % self.tag)
return self.__class__.__name__ + "(" + ", ".join(l) + ")"
[docs]class LDAPProtocolOp:
def __init__(self):
pass
[docs] def toWire(self):
raise NotImplementedError()
[docs]class LDAPProtocolRequest(LDAPProtocolOp):
needs_answer = 1
[docs]class LDAPProtocolResponse(LDAPProtocolOp):
pass
[docs]class LDAPBERDecoderContext_LDAPBindRequest(BERDecoderContext):
Identities = {
CLASS_CONTEXT | 0x00: BEROctetString,
CLASS_CONTEXT | 0x03: BERSequence,
}
[docs]class LDAPBindRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 0x00
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content, LDAPBERDecoderContext_LDAPBindRequest(fallback=berdecoder)
)
sasl = False
auth = None
if isinstance(l[2], BEROctetString):
auth = l[2].value
elif isinstance(l[2], BERSequence):
# per https://ldap.com/ldapv3-wire-protocol-reference-bind/
# Credentials are optional and not always provided
if len(l[2].data) == 2:
auth = (l[2][0].value, l[2][1].value)
else:
auth = (l[2][0].value, None)
sasl = True
r = klass(version=l[0].value, dn=l[1].value, auth=auth, tag=tag, sasl=sasl)
return r
def __init__(self, version=None, dn=None, auth=None, tag=None, sasl=False):
"""Constructor for LDAP Bind Request
For sasl=False, pass a string password for 'auth'
For sasl=True, pass a tuple of (mechanism, credentials) for 'auth'"""
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
self.version = version
if self.version is None:
self.version = 3
self.dn = dn
if self.dn is None:
self.dn = ""
self.auth = auth
if self.auth is None:
self.auth = ""
assert not sasl
self.sasl = sasl
[docs] def toWire(self):
if not self.sasl:
auth_ber = BEROctetString(self.auth, tag=CLASS_CONTEXT | 0)
else:
# since the credentails for SASL is optional must check first
# if credentials are None don't send them.
if self.auth[1]:
auth_ber = BERSequence(
[BEROctetString(self.auth[0]), BEROctetString(self.auth[1])],
tag=CLASS_CONTEXT | 3,
)
else:
auth_ber = BERSequence(
[BEROctetString(self.auth[0])], tag=CLASS_CONTEXT | 3
)
return BERSequence(
[
BERInteger(self.version),
BEROctetString(self.dn),
auth_ber,
],
tag=self.tag,
).toWire()
def __repr__(self):
auth = "*" * len(self.auth)
l = []
l.append("version=%d" % self.version)
l.append("dn=%s" % repr(self.dn))
l.append("auth=%s" % repr(auth))
if self.tag != self.__class__.tag:
l.append("tag=%d" % self.tag)
l.append("sasl=%s" % repr(self.sasl))
return self.__class__.__name__ + "(" + ", ".join(l) + ")"
[docs]class LDAPReferral(BERSequence):
tag = CLASS_CONTEXT | 0x03
[docs]class LDAPBERDecoderContext_LDAPSearchResultReference(BERDecoderContext):
Identities = {
BEROctetString.tag: LDAPString,
}
[docs]class LDAPSearchResultReference(LDAPProtocolResponse, BERSequence):
tag = CLASS_APPLICATION | 0x13
def __init__(self, uris=None, tag=None):
LDAPProtocolResponse.__init__(self)
BERSequence.__init__(self, value=[], tag=tag)
assert uris is not None
self.uris = uris
[docs] @classmethod
def fromBER(cls, tag, content, berdecoder=None):
l = berDecodeMultiple(
content,
LDAPBERDecoderContext_LDAPSearchResultReference(fallback=berdecoder),
)
r = cls(uris=l)
return r
[docs] def toWire(self):
return BERSequence(BERSequence(self.uris), tag=self.tag).toWire()
def __repr__(self):
return "{}(uris={}{})".format(
self.__class__.__name__,
repr([uri for uri in self.uris]),
f", tag={self.tag}" if self.tag != self.__class__.tag else "",
)
[docs]class LDAPResult(LDAPProtocolResponse, BERSequence):
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content, LDAPBERDecoderContext_LDAPBindRequest(fallback=berdecoder)
)
assert 3 <= len(l) <= 4
referral = None
# if (l[3:] and isinstance(l[3], LDAPReferral)):
# TODO support referrals
# self.referral=self.data[0]
r = klass(
resultCode=l[0].value,
matchedDN=l[1].value,
errorMessage=l[2].value,
referral=referral,
tag=tag,
)
return r
def __init__(
self,
resultCode=None,
matchedDN=None,
errorMessage=None,
referral=None,
serverSaslCreds=None,
tag=None,
):
LDAPProtocolResponse.__init__(self)
BERSequence.__init__(self, value=[], tag=tag)
assert resultCode is not None
self.resultCode = resultCode
if matchedDN is None:
matchedDN = ""
self.matchedDN = matchedDN
if errorMessage is None:
errorMessage = ""
self.errorMessage = errorMessage
self.referral = referral
self.serverSaslCreds = serverSaslCreds
[docs] def toWire(self):
assert self.referral is None # TODO
if self.serverSaslCreds:
return BERSequence(
[
BEREnumerated(self.resultCode),
BEROctetString(self.matchedDN),
BEROctetString(self.errorMessage),
LDAPBindResponse_serverSaslCreds(self.serverSaslCreds),
],
tag=self.tag,
).toWire()
else:
return BERSequence(
[
BEREnumerated(self.resultCode),
BEROctetString(self.matchedDN),
BEROctetString(self.errorMessage),
],
tag=self.tag,
).toWire()
def __repr__(self):
l = []
l.append("resultCode=%r" % self.resultCode)
if self.matchedDN:
l.append("matchedDN=%r" % self.matchedDN)
if self.errorMessage:
l.append("errorMessage=%r" % self.errorMessage)
if self.referral:
l.append("referral=%r" % self.referral)
if self.tag != self.__class__.tag:
l.append("tag=%d" % self.tag)
return self.__class__.__name__ + "(" + ", ".join(l) + ")"
[docs]class LDAPBindResponse_serverSaslCreds(BEROctetString):
tag = CLASS_CONTEXT | 0x07
def __repr__(self):
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(value=%s)" % self.value
else:
return self.__class__.__name__ + "(value=%s, tag=%d)" % (
self.value,
self.tag,
)
[docs]class LDAPBERDecoderContext_BindResponse(BERDecoderContext):
Identities = {
LDAPBindResponse_serverSaslCreds.tag: LDAPBindResponse_serverSaslCreds,
}
[docs]class LDAPBindResponse(LDAPResult):
tag = CLASS_APPLICATION | 0x01
resultCode = None
matchedDN = None
errorMessage = None
referral = None
serverSaslCreds = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content, LDAPBERDecoderContext_BindResponse(fallback=berdecoder)
)
assert 3 <= len(l) <= 4
try:
if isinstance(l[3], LDAPBindResponse_serverSaslCreds):
serverSaslCreds = l[3].value
else:
serverSaslCreds = None
except IndexError:
serverSaslCreds = None
referral = None
# if (l[3:] and isinstance(l[3], LDAPReferral)):
# TODO support referrals
# self.referral=self.data[0]
r = klass(
resultCode=l[0].value,
matchedDN=l[1].value,
errorMessage=l[2].value,
referral=referral,
serverSaslCreds=serverSaslCreds,
tag=tag,
)
return r
def __init__(
self,
resultCode=None,
matchedDN=None,
errorMessage=None,
referral=None,
serverSaslCreds=None,
tag=None,
):
LDAPResult.__init__(
self,
resultCode=resultCode,
matchedDN=matchedDN,
errorMessage=errorMessage,
referral=referral,
serverSaslCreds=serverSaslCreds,
tag=None,
)
def __repr__(self):
return LDAPResult.__repr__(self)
[docs]class LDAPUnbindRequest(LDAPProtocolRequest, BERNull):
tag = CLASS_APPLICATION | 0x02
needs_answer = 0
def __init__(self, *args, **kwargs):
LDAPProtocolRequest.__init__(self)
BERNull.__init__(self, *args, **kwargs)
[docs] def toWire(self):
return BERNull.toWire(self)
[docs]class LDAPAttributeDescription(BEROctetString):
pass
[docs]class LDAPAttributeValueAssertion(BERSequence):
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
assert len(l) == 2
r = klass(attributeDesc=l[0], assertionValue=l[1], tag=tag)
return r
def __init__(
self, attributeDesc=None, assertionValue=None, tag=None, escaper=escape
):
BERSequence.__init__(self, value=[], tag=tag)
assert attributeDesc is not None
self.attributeDesc = attributeDesc
self.assertionValue = assertionValue
self.escaper = escaper
[docs] def toWire(self):
return BERSequence(
[self.attributeDesc, self.assertionValue], tag=self.tag
).toWire()
def __repr__(self):
if self.tag == self.__class__.tag:
return (
self.__class__.__name__
+ "(attributeDesc={}, assertionValue={})".format(
repr(self.attributeDesc),
repr(self.assertionValue),
)
)
else:
return (
self.__class__.__name__
+ "(attributeDesc=%s, assertionValue=%s, tag=%d)"
% (repr(self.attributeDesc), repr(self.assertionValue), self.tag)
)
[docs]class LDAPFilter(BERStructured):
def __init__(self, tag=None):
BERStructured.__init__(self, tag=tag)
[docs]class LDAPFilterSet(BERSet):
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content, LDAPBERDecoderContext_Filter(fallback=berdecoder)
)
r = klass(l, tag=tag)
return r
def __eq__(self, rhs):
# Fast paths
if self is rhs:
return True
elif len(self) != len(rhs):
return False
return sorted(self, key=lambda x: x.toWire()) == sorted(
rhs, key=lambda x: x.toWire()
)
[docs]class LDAPFilter_and(LDAPFilterSet):
tag = CLASS_CONTEXT | 0x00
[docs] def asText(self):
return "(&" + "".join([x.asText() for x in self]) + ")"
[docs]class LDAPFilter_or(LDAPFilterSet):
tag = CLASS_CONTEXT | 0x01
[docs] def asText(self):
return "(|" + "".join([x.asText() for x in self]) + ")"
[docs]class LDAPFilter_not(LDAPFilter):
tag = CLASS_CONTEXT | 0x02
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
value, bytes = berDecodeObject(
LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder),
content,
)
assert bytes == len(content)
r = klass(value=value, tag=tag)
return r
def __init__(self, value, tag=tag):
LDAPFilter.__init__(self, tag=tag)
assert value is not None
self.value = value
def __repr__(self):
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(value=%s)" % repr(self.value)
else:
return self.__class__.__name__ + "(value=%s, tag=%d)" % (
repr(self.value),
self.tag,
)
[docs] def toWire(self):
value = to_bytes(self.value)
return bytes((self.identification(),)) + int2berlen(len(value)) + value
[docs] def asText(self):
return "(!" + self.value.asText() + ")"
[docs]class LDAPFilter_equalityMatch(LDAPAttributeValueAssertion):
tag = CLASS_CONTEXT | 0x03
[docs] def asText(self):
return (
"("
+ self.attributeDesc.value
+ "="
+ self.escaper(self.assertionValue.value)
+ ")"
)
[docs]class LDAPFilter_substrings_initial(LDAPString):
tag = CLASS_CONTEXT | 0x00
[docs] def asText(self):
return self.escaper(self.value)
[docs]class LDAPFilter_substrings_any(LDAPString):
tag = CLASS_CONTEXT | 0x01
[docs] def asText(self):
return self.escaper(self.value)
[docs]class LDAPFilter_substrings_final(LDAPString):
tag = CLASS_CONTEXT | 0x02
[docs] def asText(self):
return self.escaper(self.value)
[docs]class LDAPBERDecoderContext_Filter_substrings(BERDecoderContext):
Identities = {
LDAPFilter_substrings_initial.tag: LDAPFilter_substrings_initial,
LDAPFilter_substrings_any.tag: LDAPFilter_substrings_any,
LDAPFilter_substrings_final.tag: LDAPFilter_substrings_final,
}
[docs]class LDAPFilter_substrings(BERSequence):
tag = CLASS_CONTEXT | 0x04
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content, LDAPBERDecoderContext_Filter_substrings(fallback=berdecoder)
)
assert len(l) == 2
assert len(l[1]) >= 1
r = klass(type=l[0].value, substrings=list(l[1]), tag=tag)
return r
def __init__(self, type=None, substrings=None, tag=None):
BERSequence.__init__(self, value=[], tag=tag)
assert type is not None
assert substrings is not None
self.type = type
self.substrings = substrings
[docs] def toWire(self):
return BERSequence(
[LDAPString(self.type), BERSequence(self.substrings)], tag=self.tag
).toWire()
def __repr__(self):
tp = self.type
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(type={}, substrings={})".format(
repr(tp),
repr(self.substrings),
)
else:
return self.__class__.__name__ + "(type=%s, substrings=%s, tag=%d)" % (
repr(tp),
repr(self.substrings),
self.tag,
)
[docs] def asText(self):
initial = None
final = None
any = []
for s in self.substrings:
assert s is not None
if isinstance(s, LDAPFilter_substrings_initial):
assert initial is None
assert not any
assert final is None
initial = s.asText()
elif isinstance(s, LDAPFilter_substrings_final):
assert final is None
final = s.asText()
elif isinstance(s, LDAPFilter_substrings_any):
assert final is None
any.append(s.asText())
else:
raise NotImplementedError("TODO: Filter type not supported %r" % s)
if initial is None:
initial = ""
if final is None:
final = ""
return "(" + self.type + "=" + "*".join([initial] + any + [final]) + ")"
[docs]class LDAPFilter_greaterOrEqual(LDAPAttributeValueAssertion):
tag = CLASS_CONTEXT | 0x05
[docs] def asText(self):
return (
"("
+ self.attributeDesc.value
+ ">="
+ self.escaper(self.assertionValue.value)
+ ")"
)
[docs]class LDAPFilter_lessOrEqual(LDAPAttributeValueAssertion):
tag = CLASS_CONTEXT | 0x06
[docs] def asText(self):
return (
"("
+ self.attributeDesc.value
+ "<="
+ self.escaper(self.assertionValue.value)
+ ")"
)
[docs]class LDAPFilter_present(LDAPAttributeDescription):
tag = CLASS_CONTEXT | 0x07
[docs] def asText(self):
return "(%s=*)" % self.value
[docs]class LDAPFilter_approxMatch(LDAPAttributeValueAssertion):
tag = CLASS_CONTEXT | 0x08
[docs] def asText(self):
return (
"("
+ self.attributeDesc.value
+ "~="
+ self.escaper(self.assertionValue.value)
+ ")"
)
[docs]class LDAPMatchingRuleId(LDAPString):
pass
[docs]class LDAPAssertionValue(BEROctetString):
pass
[docs]class LDAPMatchingRuleAssertion_matchingRule(LDAPMatchingRuleId):
tag = CLASS_CONTEXT | 0x01
[docs]class LDAPMatchingRuleAssertion_type(LDAPAttributeDescription):
tag = CLASS_CONTEXT | 0x02
[docs]class LDAPMatchingRuleAssertion_matchValue(LDAPAssertionValue):
tag = CLASS_CONTEXT | 0x03
[docs]class LDAPMatchingRuleAssertion_dnAttributes(BERBoolean):
tag = CLASS_CONTEXT | 0x04
[docs]class LDAPBERDecoderContext_MatchingRuleAssertion(BERDecoderContext):
Identities = {
LDAPMatchingRuleAssertion_matchingRule.tag: LDAPMatchingRuleAssertion_matchingRule,
LDAPMatchingRuleAssertion_type.tag: LDAPMatchingRuleAssertion_type,
LDAPMatchingRuleAssertion_matchValue.tag: LDAPMatchingRuleAssertion_matchValue,
LDAPMatchingRuleAssertion_dnAttributes.tag: LDAPMatchingRuleAssertion_dnAttributes,
}
[docs]class LDAPMatchingRuleAssertion(BERSequence):
matchingRule = None
type = None
matchValue = None
dnAttributes = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
matchingRule = None
atype = None
matchValue = None
dnAttributes = None
l = berDecodeMultiple(
content,
LDAPBERDecoderContext_MatchingRuleAssertion(
fallback=berdecoder, inherit=berdecoder
),
)
assert 1 <= len(l) <= 4
if isinstance(l[0], LDAPMatchingRuleAssertion_matchingRule):
matchingRule = l[0]
del l[0]
if len(l) >= 1 and isinstance(l[0], LDAPMatchingRuleAssertion_type):
atype = l[0]
del l[0]
if len(l) >= 1 and isinstance(l[0], LDAPMatchingRuleAssertion_matchValue):
matchValue = l[0]
del l[0]
if len(l) >= 1 and isinstance(l[0], LDAPMatchingRuleAssertion_dnAttributes):
dnAttributes = l[0]
del l[0]
assert matchValue
if not dnAttributes:
dnAttributes = None
r = klass(
matchingRule=matchingRule,
type=atype,
matchValue=matchValue,
dnAttributes=dnAttributes,
tag=tag,
)
return r
def __init__(
self,
matchingRule=None,
type=None,
matchValue=None,
dnAttributes=None,
tag=None,
escaper=escape,
):
BERSequence.__init__(self, value=[], tag=tag)
assert matchValue is not None
if isinstance(matchingRule, (bytes, str)):
matchingRule = LDAPMatchingRuleAssertion_matchingRule(matchingRule)
if isinstance(type, (bytes, str)):
type = LDAPMatchingRuleAssertion_type(type)
if isinstance(matchValue, (bytes, str)):
matchValue = LDAPMatchingRuleAssertion_matchValue(matchValue)
if isinstance(dnAttributes, bool):
dnAttributes = LDAPMatchingRuleAssertion_dnAttributes(dnAttributes)
self.matchingRule = matchingRule
self.type = type
self.matchValue = matchValue
self.dnAttributes = dnAttributes
if not self.dnAttributes:
self.dnAttributes = None
self.escaper = escaper
[docs] def toWire(self):
return BERSequence(
filter(
lambda x: x is not None,
[self.matchingRule, self.type, self.matchValue, self.dnAttributes],
),
tag=self.tag,
).toWire()
def __repr__(self):
l = []
l.append("matchingRule=%s" % repr(self.matchingRule))
l.append("type=%s" % repr(self.type))
l.append("matchValue=%s" % repr(self.matchValue))
l.append("dnAttributes=%s" % repr(self.dnAttributes))
if self.tag != self.__class__.tag:
l.append("tag=%d" % self.tag)
return self.__class__.__name__ + "(" + ", ".join(l) + ")"
[docs]class LDAPFilter_extensibleMatch(LDAPMatchingRuleAssertion):
tag = CLASS_CONTEXT | 0x09
[docs] def asText(self):
return (
"("
+ (self.type.value if self.type else "")
+ (":dn" if self.dnAttributes and self.dnAttributes.value else "")
+ ((":" + self.matchingRule.value) if self.matchingRule else "")
+ ":="
+ self.escaper(self.matchValue.value)
+ ")"
)
[docs]class LDAPBERDecoderContext_Filter(BERDecoderContext):
Identities = {
LDAPFilter_and.tag: LDAPFilter_and,
LDAPFilter_or.tag: LDAPFilter_or,
LDAPFilter_not.tag: LDAPFilter_not,
LDAPFilter_equalityMatch.tag: LDAPFilter_equalityMatch,
LDAPFilter_substrings.tag: LDAPFilter_substrings,
LDAPFilter_greaterOrEqual.tag: LDAPFilter_greaterOrEqual,
LDAPFilter_lessOrEqual.tag: LDAPFilter_lessOrEqual,
LDAPFilter_present.tag: LDAPFilter_present,
LDAPFilter_approxMatch.tag: LDAPFilter_approxMatch,
LDAPFilter_extensibleMatch.tag: LDAPFilter_extensibleMatch,
}
LDAP_SCOPE_baseObject = 0
LDAP_SCOPE_singleLevel = 1
LDAP_SCOPE_wholeSubtree = 2
LDAP_DEREF_neverDerefAliases = 0
LDAP_DEREF_derefInSearching = 1
LDAP_DEREF_derefFindingBaseObj = 2
LDAP_DEREF_derefAlways = 3
LDAPFilterMatchAll = LDAPFilter_present("objectClass")
[docs]class LDAPSearchRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 0x03
baseObject = ""
scope = LDAP_SCOPE_wholeSubtree
derefAliases = LDAP_DEREF_neverDerefAliases
sizeLimit = 0
timeLimit = 0
typesOnly = 0
filter = LDAPFilterMatchAll
attributes = [] # TODO AttributeDescriptionList
# TODO decode
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content,
LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder),
)
assert 8 <= len(l) <= 8
r = klass(
baseObject=l[0].value,
scope=l[1].value,
derefAliases=l[2].value,
sizeLimit=l[3].value,
timeLimit=l[4].value,
typesOnly=l[5].value,
filter=l[6],
attributes=[x.value for x in l[7]],
tag=tag,
)
return r
def __init__(
self,
baseObject=None,
scope=None,
derefAliases=None,
sizeLimit=None,
timeLimit=None,
typesOnly=None,
filter=None,
attributes=None,
tag=None,
):
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
if baseObject is not None:
self.baseObject = baseObject
if scope is not None:
self.scope = scope
if derefAliases is not None:
self.derefAliases = derefAliases
if sizeLimit is not None:
self.sizeLimit = sizeLimit
if timeLimit is not None:
self.timeLimit = timeLimit
if typesOnly is not None:
self.typesOnly = typesOnly
if filter is not None:
self.filter = filter
if attributes is not None:
self.attributes = attributes
[docs] def toWire(self):
return BERSequence(
[
BEROctetString(self.baseObject),
BEREnumerated(self.scope),
BEREnumerated(self.derefAliases),
BERInteger(self.sizeLimit),
BERInteger(self.timeLimit),
BERBoolean(self.typesOnly),
self.filter,
BERSequenceOf(map(BEROctetString, self.attributes)),
],
tag=self.tag,
).toWire()
def __repr__(self):
base = self.baseObject
if self.tag == self.__class__.tag:
return self.__class__.__name__ + (
"(baseObject=%s, scope=%s, derefAliases=%s, "
+ "sizeLimit=%s, timeLimit=%s, typesOnly=%s, "
"filter=%s, attributes=%s)"
) % (
repr(base),
self.scope,
self.derefAliases,
self.sizeLimit,
self.timeLimit,
self.typesOnly,
repr(self.filter),
self.attributes,
)
else:
return self.__class__.__name__ + (
"(baseObject=%s, scope=%s, derefAliases=%s, "
+ "sizeLimit=%s, timeLimit=%s, typesOnly=%s, "
"filter=%s, attributes=%s, tag=%d)"
) % (
repr(base),
self.scope,
self.derefAliases,
self.sizeLimit,
self.timeLimit,
self.typesOnly,
repr(self.filter),
self.attributes,
self.tag,
)
[docs]class LDAPSearchResultEntry(LDAPProtocolResponse, BERSequence):
tag = CLASS_APPLICATION | 0x04
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content,
LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder),
)
objectName = l[0].value
attributes = []
for attr, li in l[1].data:
attributes.append((attr.value, [x.value for x in li]))
r = klass(objectName=objectName, attributes=attributes, tag=tag)
return r
def __init__(self, objectName, attributes, tag=None):
LDAPProtocolResponse.__init__(self)
BERSequence.__init__(self, [], tag=tag)
assert objectName is not None
assert attributes is not None
self.objectName = objectName
self.attributes = attributes
[docs] def toWire(self):
return BERSequence(
[
BEROctetString(self.objectName),
BERSequence(
[
BERSequence(
[
BEROctetString(attr_li[0]),
BERSet([BEROctetString(x) for x in attr_li[1]]),
]
)
for attr_li in self.attributes
]
),
],
tag=self.tag,
).toWire()
def __repr__(self):
name = self.objectName
attributes = [(key, [v for v in value]) for (key, value) in self.attributes]
return "{}(objectName={}, attributes={}{})".format(
self.__class__.__name__,
repr(name),
repr(attributes),
f", tag={self.tag}" if self.tag != self.__class__.tag else "",
)
[docs]class LDAPSearchResultDone(LDAPResult):
tag = CLASS_APPLICATION | 0x05
[docs]class LDAPControls(BERSequence):
tag = CLASS_CONTEXT | 0x00
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content, LDAPBERDecoderContext_LDAPControls(inherit=berdecoder)
)
r = klass(l, tag=tag)
return r
[docs]class LDAPControl(BERSequence):
criticality = None
controlValue = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
assert 1 <= len(l) <= 3
kw = {}
if len(l) == 2:
if isinstance(l[1], BERBoolean):
kw["criticality"] = l[1].value
elif isinstance(l[1], BEROctetString):
kw["controlValue"] = l[1].value
elif len(l) == 3:
kw["criticality"] = l[1].value
kw["controlValue"] = l[2].value
r = klass(controlType=l[0].value, tag=tag, **kw)
return r
def __init__(
self, controlType, criticality=None, controlValue=None, id=None, tag=None
):
BERSequence.__init__(self, value=[], tag=tag)
assert controlType is not None
self.controlType = controlType
self.criticality = criticality
self.controlValue = controlValue
[docs] def toWire(self):
self.data = [LDAPOID(self.controlType)]
if self.criticality is not None:
self.data.append(BERBoolean(self.criticality))
if self.controlValue is not None:
self.data.append(BEROctetString(self.controlValue))
return BERSequence.toWire(self)
[docs]class LDAPBERDecoderContext_LDAPControls(BERDecoderContext):
Identities = {
LDAPControl.tag: LDAPControl,
}
[docs]class LDAPBERDecoderContext_LDAPMessage(BERDecoderContext):
Identities = {
LDAPControls.tag: LDAPControls,
LDAPSearchResultReference.tag: LDAPSearchResultReference,
}
[docs]class LDAPBERDecoderContext_TopLevel(BERDecoderContext):
Identities = {
BERSequence.tag: LDAPMessage,
}
[docs]class LDAPModifyRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 0x06
object = None
modification = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
assert len(l) == 2
r = klass(object=l[0].value, modification=l[1].data, tag=tag)
return r
def __init__(self, object=None, modification=None, tag=None):
"""
Initialize the object
Example usage::
l = LDAPModifyRequest(
object='cn=foo,dc=example,dc=com',
modification=[
BERSequence([
BEREnumerated(0),
BERSequence([
LDAPAttributeDescription('attr1'),
BERSet([
LDAPString('value1'),
LDAPString('value2'),
]),
]),
]),
BERSequence([
BEREnumerated(1),
BERSequence([
LDAPAttributeDescription('attr2'),
]),
]),
])
But more likely you just want to say::
mod = delta.ModifyOp('cn=foo,dc=example,dc=com',
[delta.Add('attr1', ['value1', 'value2']),
delta.Delete('attr1', ['value1', 'value2'])])
l = mod.asLDAP()
"""
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
self.object = object
self.modification = modification
[docs] def toWire(self):
l = [LDAPString(self.object)]
if self.modification is not None:
l.append(BERSequence(self.modification))
return BERSequence(l, tag=self.tag).toWire()
def __repr__(self):
name = self.object
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(object={}, modification={})".format(
repr(name),
repr(self.modification),
)
else:
return self.__class__.__name__ + "(object=%s, modification=%s, tag=%d)" % (
repr(name),
repr(self.modification),
self.tag,
)
[docs]class LDAPModifyResponse(LDAPResult):
tag = CLASS_APPLICATION | 0x07
[docs]class LDAPAddRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 0x08
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
r = klass(entry=l[0].value, attributes=l[1], tag=tag)
return r
def __init__(self, entry=None, attributes=None, tag=None):
"""
Initialize the object
Example usage::
l=LDAPAddRequest(entry='cn=foo,dc=example,dc=com',
attributes=[(LDAPAttributeDescription("attrFoo"),
BERSet(value=(
LDAPAttributeValue("value1"),
LDAPAttributeValue("value2"),
))),
(LDAPAttributeDescription("attrBar"),
BERSet(value=(
LDAPAttributeValue("value1"),
LDAPAttributeValue("value2"),
))),
])"""
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
self.entry = entry
self.attributes = attributes
[docs] def toWire(self):
return BERSequence(
[
LDAPString(self.entry),
BERSequence(map(BERSequence, self.attributes)),
],
tag=self.tag,
).toWire()
def __repr__(self):
entry = self.entry
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(entry={}, attributes={})".format(
repr(entry),
repr(self.attributes),
)
else:
return self.__class__.__name__ + "(entry=%s, attributes=%s, tag=%d)" % (
repr(entry),
repr(self.attributes),
self.tag,
)
[docs]class LDAPAddResponse(LDAPResult):
tag = CLASS_APPLICATION | 0x09
[docs]class LDAPDelRequest(LDAPProtocolRequest, LDAPString):
tag = CLASS_APPLICATION | 0x0A
def __init__(self, value=None, entry=None, tag=None):
"""
Initialize the object
l=LDAPDelRequest(entry='cn=foo,dc=example,dc=com')
"""
if entry is None and value is not None:
entry = value
LDAPProtocolRequest.__init__(self)
LDAPString.__init__(self, value=entry, tag=tag)
[docs] def toWire(self):
return LDAPString.toWire(self)
def __repr__(self):
entry = self.value
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(entry=%s)" % repr(entry)
else:
return self.__class__.__name__ + "(entry=%s, tag=%d)" % (
repr(entry),
self.tag,
)
[docs]class LDAPDelResponse(LDAPResult):
tag = CLASS_APPLICATION | 0x0B
[docs]class LDAPModifyDNResponse_newSuperior(LDAPString):
tag = CLASS_CONTEXT | 0x00
[docs]class LDAPBERDecoderContext_ModifyDNRequest(BERDecoderContext):
Identities = {
LDAPModifyDNResponse_newSuperior.tag: LDAPModifyDNResponse_newSuperior,
}
[docs]class LDAPModifyDNRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 12
entry = None
newrdn = None
deleteoldrdn = None
newSuperior = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content, LDAPBERDecoderContext_ModifyDNRequest(fallback=berdecoder)
)
kw = {}
try:
kw["newSuperior"] = to_bytes(l[3].value)
except IndexError:
pass
r = klass(
entry=to_bytes(l[0].value),
newrdn=to_bytes(l[1].value),
deleteoldrdn=l[2].value,
tag=tag,
**kw,
)
return r
def __init__(self, entry, newrdn, deleteoldrdn, newSuperior=None, tag=None):
"""
Initialize the object
Example usage::
l=LDAPModifyDNRequest(entry='cn=foo,dc=example,dc=com',
newrdn='someAttr=value',
deleteoldrdn=0)
"""
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
assert entry is not None
assert newrdn is not None
assert deleteoldrdn is not None
self.entry = entry
self.newrdn = newrdn
self.deleteoldrdn = deleteoldrdn
self.newSuperior = newSuperior
[docs] def toWire(self):
l = [
LDAPString(self.entry),
LDAPString(self.newrdn),
BERBoolean(self.deleteoldrdn),
]
if self.newSuperior is not None:
l.append(LDAPString(self.newSuperior, tag=CLASS_CONTEXT | 0))
return BERSequence(l, tag=self.tag).toWire()
def __repr__(self):
l = [
"entry=%s" % repr(self.entry),
"newrdn=%s" % repr(self.newrdn),
"deleteoldrdn=%s" % repr(self.deleteoldrdn),
]
if self.newSuperior is not None:
l.append("newSuperior=%s" % repr(self.newSuperior))
if self.tag != self.__class__.tag:
l.append("tag=%d" % self.tag)
return self.__class__.__name__ + "(" + ", ".join(l) + ")"
[docs]class LDAPModifyDNResponse(LDAPResult):
tag = CLASS_APPLICATION | 13
[docs]class LDAPBERDecoderContext_Compare(BERDecoderContext):
Identities = {BERSequence.tag: LDAPAttributeValueAssertion}
[docs]class LDAPCompareRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 14
entry = None
ava = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content,
LDAPBERDecoderContext_Compare(fallback=berdecoder, inherit=berdecoder),
)
r = klass(entry=l[0].value, ava=l[1], tag=tag)
return r
def __init__(self, entry, ava, tag=None):
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
assert entry is not None
assert ava is not None
self.entry = entry
self.ava = ava
[docs] def toWire(self):
l = [LDAPString(self.entry), self.ava]
return BERSequence(l, tag=self.tag).toWire()
def __repr__(self):
l = [
f"entry={repr(self.entry)}",
f"ava={repr(self.ava)}",
]
return "{}({})".format(self.__class__.__name__, ", ".join(l))
[docs]class LDAPCompareResponse(LDAPResult):
tag = CLASS_APPLICATION | 15
[docs]class LDAPAbandonRequest(LDAPProtocolRequest, LDAPInteger):
tag = CLASS_APPLICATION | 0x10
needs_answer = 0
def __init__(self, value=None, id=None, tag=None):
"""
Initialize the object
l=LDAPAbandonRequest(id=1)
"""
if id is None and value is not None:
id = value
LDAPProtocolRequest.__init__(self)
LDAPInteger.__init__(self, value=id, tag=tag)
[docs] def toWire(self):
return LDAPInteger.toWire(self)
def __repr__(self):
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(id=%s)" % repr(self.value)
else:
return self.__class__.__name__ + "(id=%s, tag=%d)" % (
repr(self.value),
self.tag,
)
[docs]class LDAPOID(BEROctetString):
pass
[docs]class LDAPResponseName(LDAPOID):
tag = CLASS_CONTEXT | 10
[docs]class LDAPResponse(BEROctetString):
tag = CLASS_CONTEXT | 11
[docs]class LDAPBERDecoderContext_LDAPExtendedRequest(BERDecoderContext):
Identities = {
CLASS_CONTEXT | 0x00: BEROctetString,
CLASS_CONTEXT | 0x01: BEROctetString,
}
[docs]class LDAPExtendedRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 23
requestName = None
requestValue = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content, LDAPBERDecoderContext_LDAPExtendedRequest(fallback=berdecoder)
)
kw = {}
try:
kw["requestValue"] = l[1].value
except IndexError:
pass
r = klass(requestName=l[0].value, tag=tag, **kw)
return r
def __init__(self, requestName=None, requestValue=None, tag=None):
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
assert requestName is not None
assert isinstance(requestName, (bytes, str))
assert requestValue is None or isinstance(requestValue, (bytes, str))
self.requestName = requestName
self.requestValue = requestValue
[docs] def toWire(self):
l = [LDAPOID(self.requestName, tag=CLASS_CONTEXT | 0)]
if self.requestValue is not None:
value = to_bytes(self.requestValue)
l.append(BEROctetString(value, tag=CLASS_CONTEXT | 1))
return BERSequence(l, tag=self.tag).toWire()
[docs]class LDAPPasswordModifyRequest_userIdentity(BEROctetString):
tag = CLASS_CONTEXT | 0
[docs]class LDAPPasswordModifyRequest_passwd(BEROctetString):
def __repr__(self):
value = "*" * len(self.value)
return "{}(value={}{})".format(
self.__class__.__name__,
repr(value),
f", tag={self.tag}" if self.tag != self.__class__.tag else "",
)
[docs]class LDAPPasswordModifyRequest_oldPasswd(LDAPPasswordModifyRequest_passwd):
tag = CLASS_CONTEXT | 1
[docs]class LDAPPasswordModifyRequest_newPasswd(LDAPPasswordModifyRequest_passwd):
tag = CLASS_CONTEXT | 2
[docs]class LDAPBERDecoderContext_LDAPPasswordModifyRequest(BERDecoderContext):
Identities = {
LDAPPasswordModifyRequest_userIdentity.tag: LDAPPasswordModifyRequest_userIdentity,
LDAPPasswordModifyRequest_oldPasswd.tag: LDAPPasswordModifyRequest_oldPasswd,
LDAPPasswordModifyRequest_newPasswd.tag: LDAPPasswordModifyRequest_newPasswd,
}
[docs]class LDAPPasswordModifyRequest(LDAPExtendedRequest):
oid = b"1.3.6.1.4.1.4203.1.11.1"
def __init__(
self,
requestName=None,
userIdentity=None,
oldPasswd=None,
newPasswd=None,
tag=None,
):
assert (
requestName is None or requestName == self.oid
), "{} requestName was {} instead of {}".format(
self.__class__.__name__,
requestName,
self.oid,
)
# TODO genPasswd
l = []
self.userIdentity = None
if userIdentity is not None:
self.userIdentity = LDAPPasswordModifyRequest_userIdentity(userIdentity)
l.append(self.userIdentity)
self.oldPasswd = None
if oldPasswd is not None:
self.oldPasswd = LDAPPasswordModifyRequest_oldPasswd(oldPasswd)
l.append(self.oldPasswd)
self.newPasswd = None
if newPasswd is not None:
self.newPasswd = LDAPPasswordModifyRequest_newPasswd(newPasswd)
l.append(self.newPasswd)
LDAPExtendedRequest.__init__(
self, requestName=self.oid, requestValue=BERSequence(l).toWire(), tag=tag
)
def __repr__(self):
l = []
if self.userIdentity is not None:
l.append(f"userIdentity={repr(self.userIdentity)}")
if self.oldPasswd is not None:
l.append(f"oldPasswd={repr(self.oldPasswd)}")
if self.newPasswd is not None:
l.append(f"newPasswd={repr(self.newPasswd)}")
if self.tag != self.__class__.tag:
l.append("tag=%d" % self.tag)
return self.__class__.__name__ + "(" + ", ".join(l) + ")"
[docs]class LDAPBERDecoderContext_LDAPExtendedResponse(BERDecoderContext):
Identities = {
LDAPResponseName.tag: LDAPResponseName,
LDAPResponse.tag: LDAPResponse,
}
[docs]class LDAPExtendedResponse(LDAPResult):
tag = CLASS_APPLICATION | 0x18
responseName = None
response = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(
content, LDAPBERDecoderContext_LDAPExtendedResponse(fallback=berdecoder)
)
assert 3 <= len(l) <= 6
referral = None
responseName = None
response = None
for obj in l[3:]:
if isinstance(obj, LDAPResponseName):
responseName = obj.value
elif isinstance(obj, LDAPResponse):
response = obj.value
elif isinstance(obj, LDAPReferral):
# TODO support referrals
# self.referral=self.data[0]
pass
else:
assert False
r = klass(
resultCode=l[0].value,
matchedDN=l[1].value,
errorMessage=l[2].value,
referral=referral,
responseName=responseName,
response=response,
tag=tag,
)
return r
def __init__(
self,
resultCode=None,
matchedDN=None,
errorMessage=None,
referral=None,
serverSaslCreds=None,
responseName=None,
response=None,
tag=None,
):
LDAPResult.__init__(
self,
resultCode=resultCode,
matchedDN=matchedDN,
errorMessage=errorMessage,
referral=referral,
serverSaslCreds=serverSaslCreds,
tag=tag,
)
self.responseName = responseName
self.response = response
[docs] def toWire(self):
assert self.referral is None # TODO
l = [
BEREnumerated(self.resultCode),
BEROctetString(self.matchedDN),
BEROctetString(self.errorMessage),
# TODO referral [3] Referral OPTIONAL
]
if self.responseName is not None:
l.append(LDAPOID(self.responseName, tag=CLASS_CONTEXT | 0x0A))
if self.response is not None:
l.append(BEROctetString(self.response, tag=CLASS_CONTEXT | 0x0B))
return BERSequence(l, tag=self.tag).toWire()
[docs]class LDAPStartTLSRequest(LDAPExtendedRequest):
"""
Request to start Transport Layer Security.
See RFC 2830 for details.
"""
oid = b"1.3.6.1.4.1.1466.20037"
def __init__(self, requestName=None, tag=None):
assert (
requestName is None or requestName == self.oid
), "{} requestName was {} instead of {}".format(
self.__class__.__name__,
requestName,
self.oid,
)
LDAPExtendedRequest.__init__(self, requestName=self.oid, tag=tag)
def __repr__(self):
l = []
if self.tag != self.__class__.tag:
l.append(f"tag={self.tag}")
return self.__class__.__name__ + "(" + ", ".join(l) + ")"
[docs]class LDAPStartTLSResponse(LDAPExtendedResponse):
"""
Response to start Transport Layer Security.
See RFC 4511 section 4.14.2 for details.
"""
oid = b"1.3.6.1.4.1.1466.20037"
def __init__(
self,
resultCode=None,
matchedDN=None,
errorMessage=None,
referral=None,
serverSaslCreds=None,
responseName=None,
response=None,
tag=None,
):
LDAPExtendedResponse.__init__(
self,
resultCode=resultCode,
matchedDN=matchedDN,
errorMessage=errorMessage,
referral=referral,
serverSaslCreds=serverSaslCreds,
responseName=responseName,
response=response,
tag=tag,
)
def __repr__(self):
l = []
if self.tag != self.__class__.tag:
l.append(f"tag={self.tag}")
return self.__class__.__name__ + "(" + ", ".join(l) + ")"
[docs]class LDAPBERDecoderContext(BERDecoderContext):
Identities = {
LDAPBindResponse.tag: LDAPBindResponse,
LDAPBindRequest.tag: LDAPBindRequest,
LDAPUnbindRequest.tag: LDAPUnbindRequest,
LDAPSearchRequest.tag: LDAPSearchRequest,
LDAPSearchResultEntry.tag: LDAPSearchResultEntry,
LDAPSearchResultDone.tag: LDAPSearchResultDone,
LDAPSearchResultReference.tag: LDAPSearchResultReference,
LDAPReferral.tag: LDAPReferral,
LDAPModifyRequest.tag: LDAPModifyRequest,
LDAPModifyResponse.tag: LDAPModifyResponse,
LDAPAddRequest.tag: LDAPAddRequest,
LDAPAddResponse.tag: LDAPAddResponse,
LDAPDelRequest.tag: LDAPDelRequest,
LDAPDelResponse.tag: LDAPDelResponse,
LDAPExtendedRequest.tag: LDAPExtendedRequest,
LDAPExtendedResponse.tag: LDAPExtendedResponse,
LDAPModifyDNRequest.tag: LDAPModifyDNRequest,
LDAPModifyDNResponse.tag: LDAPModifyDNResponse,
LDAPAbandonRequest.tag: LDAPAbandonRequest,
LDAPCompareRequest.tag: LDAPCompareRequest,
LDAPCompareResponse.tag: LDAPCompareResponse,
}