"""Pure, simple, BER encoding and decoding"""
# This BER library is currently aimed at supporting LDAP, thus
# the following restrictions from RFC2251 apply:
#
# (1) Only the definite form of length encoding will be used.
#
# (2) OCTET STRING values will be encoded in the primitive form
# only.
#
# (3) If the value of a BOOLEAN type is true, the encoding MUST have
# its contents octets set to hex "FF".
#
# (4) If a value of a type is its default value, it MUST be absent.
# Only some BOOLEAN and INTEGER types have default values in
# this protocol definition.
from collections import UserList
from ldaptor._encoder import to_bytes, WireStrAlias
# xxxxxxxx
# |/|\.../
# | | |
# | | tag
# | |
# | primitive (0) or structured (1)
# |
# class
CLASS_MASK = 0xC0
CLASS_UNIVERSAL = 0x00
CLASS_APPLICATION = 0x40
CLASS_CONTEXT = 0x80
CLASS_PRIVATE = 0xC0
STRUCTURED_MASK = 0x20
STRUCTURED = 0x20
NOT_STRUCTURED = 0x00
TAG_MASK = 0x1F
# LENGTH
# 0xxxxxxx = 0..127
# 1xxxxxxx = len is stored in the next 0xxxxxxx octets
# indefinite form not supported
[docs]class UnknownBERTag(Exception):
def __init__(self, tag, context):
Exception.__init__(self)
self.tag = tag
self.context = context
def __str__(self):
return "BERDecoderContext has no tag 0x{:02x}: {}".format(
self.tag, self.context
)
[docs]def berDecodeLength(m, offset=0):
"""
Return a tuple of (length, lengthLength).
m must be atleast one byte long.
"""
l = ber2int(m[offset + 0 : offset + 1])
ll = 1
if l & 0x80:
ll = 1 + (l & 0x7F)
need(m, offset + ll)
l = ber2int(m[offset + 1 : offset + ll], signed=0)
return (l, ll)
[docs]def int2berlen(i):
assert i >= 0
e = int2ber(i, signed=False)
if i <= 127:
return e
else:
l = len(e)
assert l > 0
assert l <= 127
return bytes((0x80 | l,)) + e
[docs]def int2ber(i, signed=True):
encoded = b""
while (signed and (i > 127 or i < -128)) or (not signed and (i > 255)):
encoded = bytes((i % 256,)) + encoded
i = i >> 8
encoded = bytes((i % 256,)) + encoded
return encoded
[docs]def ber2int(e, signed=True):
need(e, 1)
v = 0 + ord(e[0:1])
if v & 0x80 and signed:
v = v - 256
for i in range(1, len(e)):
v = (v << 8) | ord(e[i : i + 1])
return v
[docs]class BERBase(WireStrAlias):
tag = None
[docs] def identification(self):
return self.tag
def __init__(self, tag=None):
if tag is not None:
self.tag = tag
def __len__(self):
return len(self.toWire())
def __eq__(self, other):
if not isinstance(other, BERBase):
return NotImplemented
return self.toWire() == other.toWire()
def __ne__(self, other):
if not isinstance(other, BERBase):
return NotImplemented
return self.toWire() != other.toWire()
def __hash__(self):
return hash(self.toWire())
[docs] def toWire(self):
return b""
[docs]class BERStructured(BERBase):
[docs] def identification(self):
return STRUCTURED | self.tag
[docs]class BERException(Exception):
pass
[docs]class BERExceptionInsufficientData(Exception):
pass
[docs]def need(buf, n):
d = n - len(buf)
if d > 0:
raise BERExceptionInsufficientData(d)
[docs]class BERInteger(BERBase):
tag = 0x02
value = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
assert len(content) > 0
value = ber2int(content)
r = klass(value=value, tag=tag)
return r
def __init__(self, value=None, tag=None):
"""Create a new BERInteger object.
value is an integer.
"""
BERBase.__init__(self, tag)
assert value is not None
self.value = value
[docs] def toWire(self):
encoded = int2ber(self.value)
return bytes((self.identification(),)) + int2berlen(len(encoded)) + encoded
def __repr__(self):
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(value=%r)" % self.value
else:
return self.__class__.__name__ + "(value=%r, tag=%d)" % (
self.value,
self.tag,
)
[docs]class BEROctetString(BERBase):
tag = 0x04
value = None
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
assert len(content) >= 0
r = klass(value=content, tag=tag)
return r
def __init__(self, value=None, tag=None):
BERBase.__init__(self, tag)
assert value is not None
self.value = value
[docs] def toWire(self):
value = to_bytes(self.value)
result = bytes((self.identification(),)) + int2berlen(len(value)) + value
return result
def __repr__(self):
value = self.value
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(value=%s)" % repr(value)
else:
return self.__class__.__name__ + "(value=%s, tag=%d)" % (
repr(value),
self.tag,
)
[docs]class BERNull(BERBase):
tag = 0x05
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
assert len(content) == 0
r = klass(tag=tag)
return r
def __init__(self, tag=None):
BERBase.__init__(self, tag)
[docs] def toWire(self):
return bytes((self.identification(),)) + bytes((0,))
def __repr__(self):
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "()"
else:
return self.__class__.__name__ + "(tag=%d)" % self.tag
[docs]class BERBoolean(BERBase):
tag = 0x01
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
assert len(content) > 0
value = ber2int(content)
r = klass(value=value, tag=tag)
return r
def __init__(self, value=None, tag=None):
"""Create a new BERInteger object.
value is an integer.
"""
BERBase.__init__(self, tag)
assert value is not None
if value:
value = 0xFF
self.value = value
[docs] def toWire(self):
assert self.value == 0 or self.value == 0xFF
return bytes((self.identification(),)) + int2berlen(1) + bytes((self.value,))
def __repr__(self):
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(value=%d)" % self.value
else:
return self.__class__.__name__ + "(value=%d, tag=%d)" % (
self.value,
self.tag,
)
[docs]class BEREnumerated(BERInteger):
tag = 0x0A
[docs]class BERSequence(BERStructured, UserList):
# TODO __getslice__ calls __init__ with no args.
tag = 0x10
[docs] @classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
r = klass(l, tag=tag)
return r
def __init__(self, value=None, tag=None):
BERStructured.__init__(self, tag)
assert value is not None
UserList.__init__(self, value)
[docs] def toWire(self):
r = b"".join(to_bytes(x) for x in self.data)
return bytes((self.identification(),)) + int2berlen(len(r)) + r
def __repr__(self):
if self.tag == self.__class__.tag:
return self.__class__.__name__ + "(value=%s)" % repr(self.data)
else:
return self.__class__.__name__ + "(value=%s, tag=%d)" % (
repr(self.data),
self.tag,
)
[docs]class BERSequenceOf(BERSequence):
pass
[docs]class BERSet(BERSequence):
tag = 0x11
[docs]class BERDecoderContext:
Identities = {
BERBoolean.tag: BERBoolean,
BERInteger.tag: BERInteger,
BEROctetString.tag: BEROctetString,
BERNull.tag: BERNull,
BEREnumerated.tag: BEREnumerated,
BERSequence.tag: BERSequence,
BERSet.tag: BERSet,
}
def __init__(self, fallback=None, inherit=None):
self.fallback = fallback
self.inherit_context = inherit
[docs] def lookup_id(self, id):
try:
return self.Identities[id]
except KeyError:
if self.fallback:
return self.fallback.lookup_id(id)
else:
return None
[docs] def inherit(self):
return self.inherit_context or self
def __repr__(self):
identities = []
for tag, class_ in self.Identities.items():
identities.append(f"0x{tag:02x}: {class_.__name__}")
return (
"<"
+ self.__class__.__name__
+ " identities={%s}" % ", ".join(identities)
+ " fallback="
+ repr(self.fallback)
+ " inherit="
+ repr(self.inherit_context)
+ ">"
)
[docs]def berDecodeObject(context, m):
"""berDecodeObject(context, bytes) -> (berobject, bytesUsed)
berobject may be None.
"""
while m:
need(m, 2)
i = ber2int(m[0:1], signed=0) & (CLASS_MASK | TAG_MASK)
length, lenlen = berDecodeLength(m, offset=1)
need(m, 1 + lenlen + length)
m2 = m[1 + lenlen : 1 + lenlen + length]
berclass = context.lookup_id(i)
if berclass:
inh = context.inherit()
assert inh
r = berclass.fromBER(tag=i, content=m2, berdecoder=inh)
return (r, 1 + lenlen + length)
else:
print(str(UnknownBERTag(i, context))) # TODO
return (None, 1 + lenlen + length)
return (None, 0)
[docs]def berDecodeMultiple(content, berdecoder):
"""berDecodeMultiple(content, berdecoder) -> [objects]
Decodes everything in content and returns a list of decoded
objects.
All of content will be decoded, and content must contain complete
BER objects.
"""
l = []
while content:
n, bytes = berDecodeObject(berdecoder, content)
if n is not None:
l.append(n)
assert bytes <= len(content)
content = content[bytes:]
return l
# TODO unimplemented classes are below:
# class BERObjectIdentifier(BERBase):
# tag = 0x06
# pass
# class BERIA5String(BERBase):
# tag = 0x16
# pass
# class BERPrintableString(BERBase):
# tag = 0x13
# pass
# class BERT61String(BERBase):
# tag = 0x14
# pass
# class BERUTCTime(BERBase):
# tag = 0x17
# pass
# class BERBitString(BERBase):
# tag = 0x03
# pass