# Copyright (C) 2001 Tommi Virtanen
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""LDAP protocol message conversion; no application logic here."""
from pureber import (
BERBoolean, BERDecoderContext, BEREnumerated, BERInteger, BERNull,
BEROctetString, BERSequence, BERSequenceOf, BERSet, BERStructured,
CLASS_APPLICATION, CLASS_CONTEXT,
berDecodeMultiple, berDecodeObject, int2berlen,
)
next_ldap_message_id=1
[docs]def alloc_ldap_message_id():
global next_ldap_message_id
r=next_ldap_message_id
next_ldap_message_id=next_ldap_message_id+1
return r
[docs]def escape(s):
s = s.replace('\\', r'\5c')
s = s.replace('*', r'\2a')
s = s.replace('(', r'\28')
s = s.replace(')', r'\29')
s = s.replace('\0', r'\00')
return s
[docs]class LDAPInteger(BERInteger):
pass
[docs]class LDAPString(BEROctetString):
pass
[docs]class LDAPAttributeValue(BEROctetString):
pass
[docs]class LDAPMessage(BERSequence):
id = None
value = None
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
id_=l[0].value
value=l[1]
if l[2:]:
controls = []
for c in l[2]:
controls.append((
c.controlType,
c.criticality,
c.controlValue,
))
else:
controls = None
assert not l[3:]
r = klass(id=id_,
value=value,
controls=controls,
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, value=None, controls=None, id=None, tag=None):
BERSequence.__init__(self, value=[], tag=tag)
assert value is not None
self.id=id
if self.id is None:
self.id=alloc_ldap_message_id()
self.value=value
self.controls = controls
def __str__(self):
l = [BERInteger(self.id), self.value]
if self.controls is not None:
l.append(LDAPControls([LDAPControl(*a) for a in self.controls]))
return str(BERSequence(l))
def __repr__(self):
l=[]
l.append('id=%r' % self.id)
l.append('value=%r' % self.value)
if self.tag!=self.__class__.tag:
l.append('tag=%d' % self.tag)
return self.__class__.__name__+'('+', '.join(l)+')'
[docs]class LDAPProtocolOp:
def __init__(self):
pass
def __str__(self):
raise NotImplementedError
[docs]class LDAPProtocolRequest(LDAPProtocolOp):
needs_answer=1
pass
[docs]class LDAPProtocolResponse(LDAPProtocolOp):
pass
[docs]class LDAPBERDecoderContext_LDAPBindRequest(BERDecoderContext):
Identities = {
CLASS_CONTEXT|0x00: BEROctetString,
CLASS_CONTEXT|0x03: BERSequence,
}
[docs]class LDAPBindRequest(LDAPProtocolRequest, BERSequence):
tag=CLASS_APPLICATION|0x00
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content,
LDAPBERDecoderContext_LDAPBindRequest(
fallback=berdecoder))
sasl = False
auth = None
if isinstance(l[2], BEROctetString):
auth = l[2].value
elif isinstance(l[2], BERSequence):
auth = (l[2][0].value, l[2][1].value)
sasl = True
r = klass(version=l[0].value,
dn=l[1].value,
auth=auth,
tag=tag,
sasl=sasl)
return r
fromBER = classmethod(fromBER)
def __init__(self, version=None, dn=None, auth=None, tag=None, sasl=False):
"""Constructor for LDAP Bind Request
For sasl=False, pass a string password for 'auth'
For sasl=True, pass a tuple of (mechanism, credentials) for 'auth'"""
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
self.version=version
if self.version is None:
self.version=3
self.dn=dn
if self.dn is None:
self.dn=''
self.auth=auth
if self.auth is None:
self.auth=''
assert(not sasl)
self.sasl=sasl
def __str__(self):
if not self.sasl:
auth_ber = BEROctetString(self.auth, tag=CLASS_CONTEXT|0)
else:
auth_ber = BERSequence([BEROctetString(self.auth[0]), BEROctetString(self.auth[1])], tag=CLASS_CONTEXT|3)
return str(BERSequence([
BERInteger(self.version),
BEROctetString(self.dn),
auth_ber,
], tag=self.tag))
def __repr__(self):
l=[]
l.append('version=%d' % self.version)
l.append('dn=%s' % repr(self.dn))
l.append('auth=%s' % repr(self.auth))
if self.tag!=self.__class__.tag:
l.append('tag=%d' % self.tag)
l.append('sasl=%s' % repr(self.sasl))
return self.__class__.__name__+'('+', '.join(l)+')'
[docs]class LDAPReferral(BERSequence):
tag = CLASS_CONTEXT | 0x03
# This is currently just a stub and implements no real functionality.
[docs]class LDAPSearchResultReference(LDAPProtocolResponse):
tag = CLASS_APPLICATION | 0x13
def __init__(self):
LDAPProtocolResponse.__init__(self)
[docs] def fromBER(cls, tag, content, berdecoder=None):
r = cls()
return r
fromBER = classmethod(fromBER)
def __str__(self):
return object.__str__(self)
def __repr__(self):
return object.__repr__(self)
[docs]class LDAPResult(LDAPProtocolResponse, BERSequence):
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPBindRequest(
fallback=berdecoder))
assert 3<=len(l)<=4
referral = None
#if (l[3:] and isinstance(l[3], LDAPReferral)):
#TODO support referrals
#self.referral=self.data[0]
r = klass(resultCode=l[0].value,
matchedDN=l[1].value,
errorMessage=l[2].value,
referral=referral,
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, resultCode=None, matchedDN=None, errorMessage=None, referral=None, serverSaslCreds=None, tag=None):
LDAPProtocolResponse.__init__(self)
BERSequence.__init__(self, value=[], tag=tag)
assert resultCode is not None
self.resultCode=resultCode
if matchedDN is None:
matchedDN=''
self.matchedDN=matchedDN
if errorMessage is None:
errorMessage=''
self.errorMessage=errorMessage
self.referral=referral
self.serverSaslCreds=serverSaslCreds
def __str__(self):
assert self.referral is None #TODO
return str(BERSequence([
BEREnumerated(self.resultCode),
BEROctetString(self.matchedDN),
BEROctetString(self.errorMessage),
#TODO referral [3] Referral OPTIONAL
], tag=self.tag))
def __repr__(self):
l=[]
l.append('resultCode=%r' % self.resultCode)
if self.matchedDN:
l.append('matchedDN=%r' % str(self.matchedDN))
if self.errorMessage:
l.append('errorMessage=%r' % str(self.errorMessage))
if self.referral:
l.append('referral=%r' % self.referral)
if self.tag!=self.__class__.tag:
l.append('tag=%d' % self.tag)
return self.__class__.__name__+'('+', '.join(l)+')'
[docs]class LDAPBindResponse_serverSaslCreds(BEROctetString):
tag = CLASS_CONTEXT|0x07
pass
[docs]class LDAPBERDecoderContext_BindResponse(BERDecoderContext):
Identities = {
LDAPBindResponse_serverSaslCreds.tag: LDAPBindResponse_serverSaslCreds,
}
[docs]class LDAPBindResponse(LDAPResult):
tag=CLASS_APPLICATION|0x01
resultCode = None
matchedDN = None
errorMessage = None
referral = None
serverSaslCreds = None
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_BindResponse(
fallback=berdecoder))
assert 3<=len(l)<=4
try:
if isinstance(l[3], LDAPBindResponse_serverSaslCreds):
serverSaslCreds=l[3]
else:
serverSaslCreds=None
except IndexError:
serverSaslCreds=None
referral = None
#if (l[3:] and isinstance(l[3], LDAPReferral)):
#TODO support referrals
#self.referral=self.data[0]
r = klass(resultCode=l[0].value,
matchedDN=l[1].value,
errorMessage=l[2].value,
referral=referral,
serverSaslCreds=serverSaslCreds,
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, resultCode=None, matchedDN=None, errorMessage=None, referral=None, serverSaslCreds=None, tag=None):
LDAPResult.__init__(self, resultCode=resultCode, matchedDN=matchedDN, errorMessage=errorMessage,
referral=referral, serverSaslCreds=serverSaslCreds, tag=None)
def __str__(self):
return LDAPResult.__str__(self)
def __repr__(self):
return LDAPResult.__repr__(self)
[docs]class LDAPUnbindRequest(LDAPProtocolRequest, BERNull):
tag=CLASS_APPLICATION|0x02
needs_answer=0
def __init__(self, *args, **kwargs):
LDAPProtocolRequest.__init__(self)
BERNull.__init__(self, *args, **kwargs)
def __str__(self):
return BERNull.__str__(self)
[docs]class LDAPAttributeDescription(BEROctetString):
pass
[docs]class LDAPAttributeValueAssertion(BERSequence):
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
assert len(l) == 2
r = klass(attributeDesc=l[0],
assertionValue=l[1],
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, attributeDesc=None, assertionValue=None, tag=None):
BERSequence.__init__(self, value=[], tag=tag)
assert attributeDesc is not None
self.attributeDesc=attributeDesc
self.assertionValue=assertionValue
def __str__(self):
return str(BERSequence([self.attributeDesc,
self.assertionValue],
tag=self.tag))
def __repr__(self):
if self.tag==self.__class__.tag:
return self.__class__.__name__+"(attributeDesc=%s, assertionValue=%s)"\
%(repr(self.attributeDesc), repr(self.assertionValue))
else:
return self.__class__.__name__+"(attributeDesc=%s, assertionValue=%s, tag=%d)"\
%(repr(self.attributeDesc), repr(self.assertionValue), self.tag)
[docs]class LDAPFilter(BERStructured):
def __init__(self, tag=None):
BERStructured.__init__(self, tag=tag)
[docs]class LDAPFilterSet(BERSet):
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_Filter(fallback=berdecoder))
r = klass(l, tag=tag)
return r
fromBER = classmethod(fromBER)
[docs]class LDAPFilter_and(LDAPFilterSet):
tag = CLASS_CONTEXT|0x00
[docs] def asText(self):
return '(&'+''.join([x.asText() for x in self])+')'
[docs]class LDAPFilter_or(LDAPFilterSet):
tag = CLASS_CONTEXT|0x01
[docs] def asText(self):
return '(|'+''.join([x.asText() for x in self])+')'
[docs]class LDAPFilter_not(LDAPFilter):
tag = CLASS_CONTEXT|0x02
[docs] def fromBER(klass, tag, content, berdecoder=None):
value, bytes = berDecodeObject(LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder), content)
assert bytes == len(content)
r = klass(value=value,
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, value, tag=tag):
LDAPFilter.__init__(self, tag=tag)
assert value is not None
self.value=value
def __repr__(self):
if self.tag==self.__class__.tag:
return self.__class__.__name__\
+"(value=%s)"\
%repr(self.value)
else:
return self.__class__.__name__\
+"(value=%s, tag=%d)"\
%(repr(self.value), self.tag)
def __str__(self):
r=str(self.value)
return chr(self.identification())+int2berlen(len(r))+r
[docs] def asText(self):
return '(!'+self.value.asText()+')'
[docs]class LDAPFilter_equalityMatch(LDAPAttributeValueAssertion):
tag = CLASS_CONTEXT|0x03
[docs] def asText(self):
return '('+self.attributeDesc.value+'=' \
+escape(self.assertionValue.value)+')'
[docs]class LDAPFilter_substrings_initial(LDAPString):
tag = CLASS_CONTEXT|0x00
[docs] def asText(self):
return escape(self.value)
[docs]class LDAPFilter_substrings_any(LDAPString):
tag = CLASS_CONTEXT|0x01
[docs] def asText(self):
return escape(self.value)
[docs]class LDAPFilter_substrings_final(LDAPString):
tag = CLASS_CONTEXT|0x02
[docs] def asText(self):
return escape(self.value)
[docs]class LDAPBERDecoderContext_Filter_substrings(BERDecoderContext):
Identities = {
LDAPFilter_substrings_initial.tag: LDAPFilter_substrings_initial,
LDAPFilter_substrings_any.tag: LDAPFilter_substrings_any,
LDAPFilter_substrings_final.tag: LDAPFilter_substrings_final,
}
[docs]class LDAPFilter_substrings(BERSequence):
tag = CLASS_CONTEXT|0x04
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_Filter_substrings(fallback=berdecoder))
assert len(l) == 2
assert len(l[1])>=1
r = klass(type=l[0].value,
substrings=list(l[1]),
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, type=None, substrings=None, tag=None):
BERSequence.__init__(self, value=[], tag=tag)
assert type is not None
assert substrings is not None
self.type=type
self.substrings=substrings
def __str__(self):
return str(BERSequence([
LDAPString(self.type),
BERSequence(self.substrings)], tag=self.tag))
def __repr__(self):
if self.tag==self.__class__.tag:
return self.__class__.__name__\
+"(type=%s, substrings=%s)"\
%(repr(self.type), repr(self.substrings))
else:
return self.__class__.__name__\
+"(type=%s, substrings=%s, tag=%d)"\
%(repr(self.type), repr(self.substrings), self.tag)
[docs] def asText(self):
initial=None
final=None
any=[]
for s in self.substrings:
assert s is not None
if isinstance(s, LDAPFilter_substrings_initial):
assert initial is None
assert not any
assert final is None
initial=s.asText()
elif isinstance(s, LDAPFilter_substrings_final):
assert final is None
final=s.asText()
elif isinstance(s, LDAPFilter_substrings_any):
assert final is None
any.append(s.asText())
else:
raise NotImplementedError('TODO: Filter type not supported %r' % s)
if initial is None:
initial=''
if final is None:
final=''
return '('+self.type+'=' \
+'*'.join([initial]+any+[final])+')'
[docs]class LDAPFilter_greaterOrEqual(LDAPAttributeValueAssertion):
tag = CLASS_CONTEXT|0x05
[docs] def asText(self):
return '('+self.attributeDesc.value+'>=' \
+escape(self.assertionValue.value)+')'
[docs]class LDAPFilter_lessOrEqual(LDAPAttributeValueAssertion):
tag = CLASS_CONTEXT|0x06
[docs] def asText(self):
return '('+self.attributeDesc.value+'<=' \
+escape(self.assertionValue.value)+')'
[docs]class LDAPFilter_present(LDAPAttributeDescription):
tag = CLASS_CONTEXT|0x07
[docs] def asText(self):
return '(%s=*)' % self.value
[docs]class LDAPFilter_approxMatch(LDAPAttributeValueAssertion):
tag = CLASS_CONTEXT|0x08
[docs] def asText(self):
return '('+self.attributeDesc.value+'~=' \
+escape(self.assertionValue.value)+')'
[docs]class LDAPMatchingRuleId(LDAPString):
pass
[docs]class LDAPAssertionValue(BEROctetString):
pass
[docs]class LDAPMatchingRuleAssertion_matchingRule(LDAPMatchingRuleId):
tag = CLASS_CONTEXT|0x01
pass
[docs]class LDAPMatchingRuleAssertion_type(LDAPAttributeDescription):
tag = CLASS_CONTEXT|0x02
pass
[docs]class LDAPMatchingRuleAssertion_matchValue(LDAPAssertionValue):
tag = CLASS_CONTEXT|0x03
pass
[docs]class LDAPMatchingRuleAssertion_dnAttributes(BERBoolean):
tag = CLASS_CONTEXT|0x04
pass
[docs]class LDAPBERDecoderContext_MatchingRuleAssertion(BERDecoderContext):
Identities = {
LDAPMatchingRuleAssertion_matchingRule.tag: LDAPMatchingRuleAssertion_matchingRule,
LDAPMatchingRuleAssertion_type.tag: LDAPMatchingRuleAssertion_type,
LDAPMatchingRuleAssertion_matchValue.tag: LDAPMatchingRuleAssertion_matchValue,
LDAPMatchingRuleAssertion_dnAttributes.tag: LDAPMatchingRuleAssertion_dnAttributes,
}
[docs]class LDAPMatchingRuleAssertion(BERSequence):
matchingRule=None
type=None
matchValue=None
dnAttributes=None
[docs] def fromBER(klass, tag, content, berdecoder=None):
matchingRule = None
atype = None
matchValue = None
dnAttributes = None
l = berDecodeMultiple(content, LDAPBERDecoderContext_MatchingRuleAssertion(fallback=berdecoder, inherit=berdecoder))
assert 1 <= len(l) <= 4
if isinstance(l[0], LDAPMatchingRuleAssertion_matchingRule):
matchingRule=l[0]
del l[0]
if len(l)>=1 and isinstance(l[0], LDAPMatchingRuleAssertion_type):
atype = l[0]
del l[0]
if len(l) >= 1 and isinstance(l[0], LDAPMatchingRuleAssertion_matchValue):
matchValue = l[0]
del l[0]
if len(l) >= 1 and isinstance(l[0], LDAPMatchingRuleAssertion_dnAttributes):
dnAttributes = l[0]
del l[0]
assert matchValue
if not dnAttributes:
dnAttributes=None
r = klass(
matchingRule=matchingRule,
type=atype,
matchValue=matchValue,
dnAttributes=dnAttributes,
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, matchingRule=None, type=None, matchValue=None, dnAttributes=None, tag=None):
BERSequence.__init__(self, value=[], tag=tag)
assert matchValue is not None
self.matchingRule=matchingRule
self.type=type
self.matchValue=matchValue
self.dnAttributes=dnAttributes
if not self.dnAttributes:
self.dnAttributes=None
def __str__(self):
return str(BERSequence(
filter(lambda x: x is not None, [self.matchingRule, self.type, self.matchValue, self.dnAttributes]), tag=self.tag))
def __repr__(self):
l=[]
l.append('matchingRule=%s' % repr(self.matchingRule))
l.append('type=%s' % repr(self.type))
l.append('matchValue=%s' % repr(self.matchValue))
l.append('dnAttributes=%s' % repr(self.dnAttributes))
if self.tag!=self.__class__.tag:
l.append('tag=%d' % self.tag)
return self.__class__.__name__+'('+', '.join(l)+')'
[docs]class LDAPFilter_extensibleMatch(LDAPMatchingRuleAssertion):
tag = CLASS_CONTEXT|0x09
pass
[docs]class LDAPBERDecoderContext_Filter(BERDecoderContext):
Identities = {
LDAPFilter_and.tag: LDAPFilter_and,
LDAPFilter_or.tag: LDAPFilter_or,
LDAPFilter_not.tag: LDAPFilter_not,
LDAPFilter_equalityMatch.tag: LDAPFilter_equalityMatch,
LDAPFilter_substrings.tag: LDAPFilter_substrings,
LDAPFilter_greaterOrEqual.tag: LDAPFilter_greaterOrEqual,
LDAPFilter_lessOrEqual.tag: LDAPFilter_lessOrEqual,
LDAPFilter_present.tag: LDAPFilter_present,
LDAPFilter_approxMatch.tag: LDAPFilter_approxMatch,
LDAPFilter_extensibleMatch.tag: LDAPFilter_extensibleMatch,
}
LDAP_SCOPE_baseObject=0
LDAP_SCOPE_singleLevel=1
LDAP_SCOPE_wholeSubtree=2
LDAP_DEREF_neverDerefAliases=0
LDAP_DEREF_derefInSearching=1
LDAP_DEREF_derefFindingBaseObj=2
LDAP_DEREF_derefAlways=3
LDAPFilterMatchAll = LDAPFilter_present('objectClass')
[docs]class LDAPSearchRequest(LDAPProtocolRequest, BERSequence):
tag=CLASS_APPLICATION|0x03
baseObject=''
scope=LDAP_SCOPE_wholeSubtree
derefAliases=LDAP_DEREF_neverDerefAliases
sizeLimit=0
timeLimit=0
typesOnly=0
filter=LDAPFilterMatchAll
attributes=[] #TODO AttributeDescriptionList
#TODO decode
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder))
assert 8<=len(l)<=8
r = klass(baseObject=l[0].value,
scope=l[1].value,
derefAliases=l[2].value,
sizeLimit=l[3].value,
timeLimit=l[4].value,
typesOnly=l[5].value,
filter=l[6],
attributes=[x.value for x in l[7]],
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self,
baseObject=None,
scope=None,
derefAliases=None,
sizeLimit=None,
timeLimit=None,
typesOnly=None,
filter=None,
attributes=None,
tag=None):
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
if baseObject is not None:
self.baseObject=baseObject
if scope is not None:
self.scope=scope
if derefAliases is not None:
self.derefAliases=derefAliases
if sizeLimit is not None:
self.sizeLimit=sizeLimit
if timeLimit is not None:
self.timeLimit=timeLimit
if typesOnly is not None:
self.typesOnly=typesOnly
if filter is not None:
self.filter=filter
if attributes is not None:
self.attributes=attributes
def __str__(self):
return str(BERSequence([
BEROctetString(self.baseObject),
BEREnumerated(self.scope),
BEREnumerated(self.derefAliases),
BERInteger(self.sizeLimit),
BERInteger(self.timeLimit),
BERBoolean(self.typesOnly),
self.filter,
BERSequenceOf(map(BEROctetString, self.attributes)),
], tag=self.tag))
def __repr__(self):
if self.tag==self.__class__.tag:
return self.__class__.__name__\
+("(baseObject=%s, scope=%s, derefAliases=%s, " \
+"sizeLimit=%s, timeLimit=%s, typesOnly=%s, " \
"filter=%s, attributes=%s)") \
%(repr(self.baseObject), self.scope,
self.derefAliases, self.sizeLimit,
self.timeLimit, self.typesOnly,
repr(self.filter), self.attributes)
else:
return self.__class__.__name__\
+("(baseObject=%s, scope=%s, derefAliases=%s, " \
+"sizeLimit=%s, timeLimit=%s, typesOnly=%s, " \
"filter=%s, attributes=%s, tag=%d)") \
%(repr(self.baseObject), self.scope,
self.derefAliases, self.sizeLimit,
self.timeLimit, self.typesOnly,
self.filter, self.attributes, self.tag)
[docs]class LDAPSearchResultEntry(LDAPProtocolResponse, BERSequence):
tag=CLASS_APPLICATION|0x04
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder))
objectName=l[0].value
attributes=[]
for attr, li in l[1].data:
attributes.append((attr.value, map(lambda x: x.value, li)))
r = klass(objectName=objectName,
attributes=attributes,
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, objectName, attributes, tag=None):
LDAPProtocolResponse.__init__(self)
BERSequence.__init__(self, [], tag=tag)
assert objectName is not None
assert attributes is not None
self.objectName=objectName
self.attributes=attributes
def __str__(self):
return str(BERSequence([
BEROctetString(self.objectName),
BERSequence(map(lambda (attr,li):
BERSequence([BEROctetString(attr),
BERSet(map(BEROctetString,
li))]),
self.attributes)),
], tag=self.tag))
def __repr__(self):
if self.tag==self.__class__.tag:
return self.__class__.__name__\
+"(objectName=%s, attributes=%s"\
%(repr(str(self.objectName)),
repr(map(lambda (a,l):
(str(a),
map(lambda i, l=l: str(i), l)),
self.attributes)))
else:
return self.__class__.__name__\
+"(objectName=%s, attributes=%s, tag=%d"\
%(repr(str(self.objectName)),
repr(map(lambda (a,l):
(str(a),
map(lambda i, l=l: str(i), l)),
self.attributes)),
self.tag)
[docs]class LDAPSearchResultDone(LDAPResult):
tag=CLASS_APPLICATION|0x05
pass
[docs]class LDAPControls(BERSequence):
tag = CLASS_CONTEXT|0x00
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPControls(
inherit=berdecoder))
r = klass(l, tag=tag)
return r
fromBER = classmethod(fromBER)
[docs]class LDAPControl(BERSequence):
criticality = None
controlValue = None
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
assert 1<=len(l)<= 3
kw = {}
if len(l) == 2:
if isinstance(l[1], BERBoolean):
kw['criticality'] = l[1].value
elif isinstance(l[1], BEROctetString):
kw['controlValue'] = l[1].value
elif len(l) == 3:
kw['criticality'] = l[1].value
kw['controlValue'] = l[2].value
r = klass(controlType=l[0].value,
tag=tag,
**kw)
return r
fromBER = classmethod(fromBER)
def __init__(self,
controlType, criticality=None, controlValue=None,
id=None, tag=None):
BERSequence.__init__(self, value=[], tag=tag)
assert controlType is not None
self.controlType = controlType
self.criticality = criticality
self.controlValue = controlValue
def __str__(self):
self.data=[LDAPOID(self.controlType)]
if self.criticality is not None:
self.data.append(BERBoolean(self.criticality))
if self.controlValue is not None:
self.data.append(BEROctetString(self.controlValue))
return BERSequence.__str__(self)
[docs]class LDAPBERDecoderContext_LDAPControls(BERDecoderContext):
Identities = {
LDAPControl.tag: LDAPControl,
}
[docs]class LDAPBERDecoderContext_LDAPMessage(BERDecoderContext):
Identities = {
LDAPControls.tag: LDAPControls,
LDAPSearchResultReference.tag: LDAPSearchResultReference,
}
[docs]class LDAPBERDecoderContext_TopLevel(BERDecoderContext):
Identities = {
BERSequence.tag: LDAPMessage,
}
[docs]class LDAPModifyRequest(LDAPProtocolRequest, BERSequence):
tag=CLASS_APPLICATION|0x06
object = None
modification = None
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
assert len(l) == 2
r = klass(object=l[0].value,
modification=l[1].data,
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, object=None, modification=None, tag=None):
"""
Initialize the object
Example usage::
l = LDAPModifyRequest(
object='cn=foo,dc=example,dc=com',
modification=[
BERSequence([
BEREnumerated(0),
BERSequence([
LDAPAttributeDescription('attr1'),
BERSet([
LDAPString('value1'),
LDAPString('value2'),
]),
]),
]),
BERSequence([
BEREnumerated(1),
BERSequence([
LDAPAttributeDescription('attr2'),
]),
]),
])
But more likely you just want to say::
mod = delta.ModifyOp('cn=foo,dc=example,dc=com',
[delta.Add('attr1', ['value1', 'value2']),
delta.Delete('attr1', ['value1', 'value2'])])
l = mod.asLDAP()
"""
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
self.object=object
self.modification=modification
def __str__(self):
l=[LDAPString(self.object)]
if self.modification is not None:
l.append(BERSequence(self.modification))
return str(BERSequence(l, tag=self.tag))
def __repr__(self):
if self.tag==self.__class__.tag:
return self.__class__.__name__+"(object=%s, modification=%s)"\
%(repr(self.object), repr(self.modification))
else:
return self.__class__.__name__+"(object=%s, modification=%s, tag=%d)" \
%(repr(self.object), repr(self.modification), self.tag)
[docs]class LDAPModifyResponse(LDAPResult):
tag = CLASS_APPLICATION|0x07
[docs]class LDAPAddRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION|0x08
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
r = klass(entry=l[0].value,
attributes=l[1],
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, entry=None, attributes=None, tag=None):
"""
Initialize the object
Example usage::
l=LDAPAddRequest(entry='cn=foo,dc=example,dc=com',
attributes=[(LDAPAttributeDescription("attrFoo"),
BERSet(value=(
LDAPAttributeValue("value1"),
LDAPAttributeValue("value2"),
))),
(LDAPAttributeDescription("attrBar"),
BERSet(value=(
LDAPAttributeValue("value1"),
LDAPAttributeValue("value2"),
))),
])
"""
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
self.entry=entry
self.attributes=attributes
def __str__(self):
return str(BERSequence([
LDAPString(self.entry),
BERSequence(map(BERSequence, self.attributes)),
], tag=self.tag))
def __repr__(self):
if self.tag==self.__class__.tag:
return self.__class__.__name__+"(entry=%s, attributes=%s)"\
%(repr(self.entry), repr(self.attributes))
else:
return self.__class__.__name__+"(entry=%s, attributes=%s, tag=%d)" \
%(repr(self.entry), repr(self.attributes), self.tag)
[docs]class LDAPAddResponse(LDAPResult):
tag = CLASS_APPLICATION|0x09
[docs]class LDAPDelRequest(LDAPProtocolRequest, LDAPString):
tag = CLASS_APPLICATION|0x0a
def __init__(self, value=None, entry=None, tag=None):
"""
Initialize the object
l=LDAPDelRequest(entry='cn=foo,dc=example,dc=com')
"""
if entry is None and value is not None:
entry = value
LDAPProtocolRequest.__init__(self)
LDAPString.__init__(self, value=entry, tag=tag)
def __str__(self):
return LDAPString.__str__(self)
def __repr__(self):
if self.tag==self.__class__.tag:
return self.__class__.__name__+"(entry=%s)" \
%repr(self.value)
else:
return self.__class__.__name__ \
+"(entry=%s, tag=%d)" \
%(repr(self.value), self.tag)
[docs]class LDAPDelResponse(LDAPResult):
tag = CLASS_APPLICATION|0x0b
pass
[docs]class LDAPModifyDNResponse_newSuperior(LDAPString):
tag = CLASS_CONTEXT|0x00
pass
[docs]class LDAPBERDecoderContext_ModifyDNRequest(BERDecoderContext):
Identities = {
LDAPModifyDNResponse_newSuperior.tag: LDAPModifyDNResponse_newSuperior,
}
[docs]class LDAPModifyDNRequest(LDAPProtocolRequest, BERSequence):
tag=CLASS_APPLICATION|12
entry=None
newrdn=None
deleteoldrdn=None
newSuperior=None
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_ModifyDNRequest(fallback=berdecoder))
kw = {}
try:
kw['newSuperior'] = str(l[3].value)
except IndexError:
pass
r = klass(entry=str(l[0].value),
newrdn=str(l[1].value),
deleteoldrdn=l[2].value,
tag=tag,
**kw)
return r
fromBER = classmethod(fromBER)
def __init__(self, entry, newrdn, deleteoldrdn, newSuperior=None,
tag=None):
"""
Initialize the object
Example usage::
l=LDAPModifyDNRequest(entry='cn=foo,dc=example,dc=com',
newrdn='someAttr=value',
deleteoldrdn=0)
"""
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
assert entry is not None
assert newrdn is not None
assert deleteoldrdn is not None
self.entry=entry
self.newrdn=newrdn
self.deleteoldrdn=deleteoldrdn
self.newSuperior=newSuperior
def __str__(self):
l=[
LDAPString(self.entry),
LDAPString(self.newrdn),
BERBoolean(self.deleteoldrdn),
]
if self.newSuperior is not None:
l.append(LDAPString(self.newSuperior, tag=CLASS_CONTEXT|0))
return str(BERSequence(l, tag=self.tag))
def __repr__(self):
l = [
"entry=%s" % repr(self.entry),
"newrdn=%s" % repr(self.newrdn),
"deleteoldrdn=%s" % repr(self.deleteoldrdn),
]
if self.newSuperior is not None:
l.append("newSuperior=%s" % repr(self.newSuperior))
if self.tag!=self.__class__.tag:
l.append("tag=%d" % self.tag)
return self.__class__.__name__ + "(" + ', '.join(l) + ")"
[docs]class LDAPModifyDNResponse(LDAPResult):
tag=CLASS_APPLICATION|13
#class LDAPCompareResponse(LDAPProtocolResponse):
#class LDAPCompareRequest(LDAPProtocolRequest):
[docs]class LDAPAbandonRequest(LDAPProtocolRequest, LDAPInteger):
tag = CLASS_APPLICATION|0x10
needs_answer=0
def __init__(self, value=None, id=None, tag=None):
"""
Initialize the object
l=LDAPAbandonRequest(id=1)
"""
if id is None and value is not None:
id = value
LDAPProtocolRequest.__init__(self)
LDAPInteger.__init__(self, value=id, tag=tag)
def __str__(self):
return LDAPInteger.__str__(self)
def __repr__(self):
if self.tag==self.__class__.tag:
return self.__class__.__name__+"(id=%s)" \
%repr(self.value)
else:
return self.__class__.__name__ \
+"(id=%s, tag=%d)" \
%(repr(self.value), self.tag)
[docs]class LDAPOID(BEROctetString):
pass
[docs]class LDAPResponseName(LDAPOID):
tag = CLASS_CONTEXT|10
[docs]class LDAPResponse(BEROctetString):
tag = CLASS_CONTEXT|11
[docs]class LDAPBERDecoderContext_LDAPExtendedRequest(BERDecoderContext):
Identities = {
CLASS_CONTEXT|0x00: BEROctetString,
CLASS_CONTEXT|0x01: BEROctetString,
}
[docs]class LDAPExtendedRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION|23
requestName = None
requestValue = None
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content,
LDAPBERDecoderContext_LDAPExtendedRequest(
fallback=berdecoder))
kw = {}
try:
kw['requestValue'] = l[1].value
except IndexError:
pass
r = klass(requestName=l[0].value,
tag=tag,
**kw)
return r
fromBER = classmethod(fromBER)
def __init__(self, requestName=None, requestValue=None,
tag=None):
LDAPProtocolRequest.__init__(self)
BERSequence.__init__(self, [], tag=tag)
assert requestName is not None
assert isinstance(requestName, basestring)
assert requestValue is None or isinstance(requestValue, basestring)
self.requestName=requestName
self.requestValue=requestValue
def __str__(self):
l=[LDAPOID(self.requestName, tag=CLASS_CONTEXT|0)]
if self.requestValue is not None:
l.append(BEROctetString(str(self.requestValue), tag=CLASS_CONTEXT|1))
return str(BERSequence(l, tag=self.tag))
[docs]class LDAPPasswordModifyRequest_userIdentity(BEROctetString):
tag=CLASS_CONTEXT|0
[docs]class LDAPPasswordModifyRequest_oldPasswd(BEROctetString):
tag=CLASS_CONTEXT|1
[docs]class LDAPPasswordModifyRequest_newPasswd(BEROctetString):
tag=CLASS_CONTEXT|2
[docs]class LDAPBERDecoderContext_LDAPPasswordModifyRequest(BERDecoderContext):
Identities = {
LDAPPasswordModifyRequest_userIdentity.tag:
LDAPPasswordModifyRequest_userIdentity,
LDAPPasswordModifyRequest_oldPasswd.tag:
LDAPPasswordModifyRequest_oldPasswd,
LDAPPasswordModifyRequest_newPasswd.tag:
LDAPPasswordModifyRequest_newPasswd,
}
[docs]class LDAPPasswordModifyRequest(LDAPExtendedRequest):
oid = '1.3.6.1.4.1.4203.1.11.1'
def __init__(self, requestName=None,
userIdentity=None, oldPasswd=None, newPasswd=None,
tag=None):
assert (requestName is None
or requestName == self.oid), \
'%s requestName was %s instead of %s' \
% (self.__class__.__name__, requestName, self.oid)
#TODO genPasswd
l=[]
if userIdentity is not None:
l.append(LDAPPasswordModifyRequest_userIdentity(userIdentity))
if oldPasswd is not None:
l.append(LDAPPasswordModifyRequest_oldPasswd(oldPasswd))
if newPasswd is not None:
l.append(LDAPPasswordModifyRequest_newPasswd(newPasswd))
LDAPExtendedRequest.__init__(
self,
requestName=self.oid,
requestValue=str(BERSequence(l)),
tag=tag)
def __repr__(self):
l=[]
# TODO userIdentity, oldPasswd, newPasswd
if self.tag!=self.__class__.tag:
l.append('tag=%d' % self.tag)
return self.__class__.__name__+'('+', '.join(l)+')'
[docs]class LDAPBERDecoderContext_LDAPExtendedResponse(BERDecoderContext):
Identities = {
LDAPResponseName.tag: LDAPResponseName,
LDAPResponse.tag: LDAPResponse,
}
[docs]class LDAPExtendedResponse(LDAPResult):
tag = CLASS_APPLICATION|0x18
responseName = None
response = None
[docs] def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPExtendedResponse(
fallback=berdecoder))
assert 3<=len(l)<=4
referral = None
#if (l[3:] and isinstance(l[3], LDAPReferral)):
#TODO support referrals
#self.referral=self.data[0]
r = klass(resultCode=l[0].value,
matchedDN=l[1].value,
errorMessage=l[2].value,
referral=referral,
tag=tag)
return r
fromBER = classmethod(fromBER)
def __init__(self, resultCode=None, matchedDN=None, errorMessage=None,
referral=None, serverSaslCreds=None,
responseName=None, response=None,
tag=None):
LDAPResult.__init__(self,
resultCode=resultCode,
matchedDN=matchedDN,
errorMessage=errorMessage,
referral=referral,
serverSaslCreds=serverSaslCreds)
self.responseName=responseName
self.response=response
def __str__(self):
assert self.referral is None #TODO
l=[BEREnumerated(self.resultCode),
BEROctetString(self.matchedDN),
BEROctetString(self.errorMessage),
#TODO referral [3] Referral OPTIONAL
]
if self.responseName is not None:
l.append(LDAPOID(self.responseName, tag=CLASS_CONTEXT|0x0a))
if self.response is not None:
l.append(BEROctetString(self.response, tag=CLASS_CONTEXT|0x0b))
return str(BERSequence(l, tag=self.tag))
[docs]class LDAPStartTLSRequest(LDAPExtendedRequest):
"""
Request to start Transport Layer Security.
See RFC 2830 for details.
"""
oid = '1.3.6.1.4.1.1466.20037'
def __init__(self, requestName=None, tag=None):
assert (requestName is None
or requestName == self.oid), \
'%s requestName was %s instead of %s' \
% (self.__class__.__name__, requestName, self.oid)
LDAPExtendedRequest.__init__(
self,
requestName=self.oid,
tag=tag)
def __repr__(self):
l = []
if self.tag != self.__class__.tag:
l.append('tag={0}'.format(self.tag))
return self.__class__.__name__+'('+', '.join(l)+')'
[docs]class LDAPStartTLSResponse(LDAPExtendedResponse):
"""
Response to start Transport Layer Security.
See RFC 4511 section 4.14.2 for details.
"""
oid = '1.3.6.1.4.1.1466.20037'
def __init__(self, resultCode=None, matchedDN=None, errorMessage=None,
referral=None, serverSaslCreds=None,
responseName=None, response=None,
tag=None):
LDAPExtendedResponse.__init__(self,
resultCode=resultCode,
matchedDN=matchedDN,
errorMessage=errorMessage,
referral=referral,
serverSaslCreds=serverSaslCreds,
responseName=responseName,
response=response,
tag=tag)
def __repr__(self):
l = []
if self.tag!=self.__class__.tag:
l.append('tag={0}'.format(self.tag))
return self.__class__.__name__+'('+', '.join(l)+')'
[docs]class LDAPBERDecoderContext(BERDecoderContext):
Identities = {
LDAPBindResponse.tag: LDAPBindResponse,
LDAPBindRequest.tag: LDAPBindRequest,
LDAPUnbindRequest.tag: LDAPUnbindRequest,
LDAPSearchRequest.tag: LDAPSearchRequest,
LDAPSearchResultEntry.tag: LDAPSearchResultEntry,
LDAPSearchResultDone.tag: LDAPSearchResultDone,
LDAPReferral.tag: LDAPReferral,
LDAPModifyRequest.tag: LDAPModifyRequest,
LDAPModifyResponse.tag: LDAPModifyResponse,
LDAPAddRequest.tag: LDAPAddRequest,
LDAPAddResponse.tag: LDAPAddResponse,
LDAPDelRequest.tag: LDAPDelRequest,
LDAPDelResponse.tag: LDAPDelResponse,
LDAPExtendedRequest.tag: LDAPExtendedRequest,
LDAPExtendedResponse.tag: LDAPExtendedResponse,
LDAPModifyDNRequest.tag: LDAPModifyDNRequest,
LDAPModifyDNResponse.tag: LDAPModifyDNResponse,
LDAPAbandonRequest.tag: LDAPAbandonRequest,
}