Source code for ldaptor.protocols.ldap.ldifprotocol
import base64
from twisted.protocols import basic
from twisted.internet import protocol
from ldaptor import entry
[docs]class LDIFParseError(Exception):
"""Error parsing LDIF."""
def __str__(self):
s = self.__doc__
if self[:]:
s = ': '.join([s]+map(str, self[:]))
return s+'.'
[docs]class LDIFLineWithoutSemicolonError(LDIFParseError):
"""LDIF line without semicolon seen"""
pass
[docs]class LDIFEntryStartsWithNonDNError(LDIFParseError):
"""LDIF entry starts with a non-DN line"""
pass
[docs]class LDIFEntryStartsWithSpaceError(LDIFParseError):
"""Invalid LDIF value format"""
pass
[docs]class LDIFVersionNotANumberError(LDIFParseError):
"""Non-numeric LDIF version number"""
pass
[docs]class LDIFUnsupportedVersionError(LDIFParseError):
"""LDIF version not supported"""
pass
[docs]class LDIFTruncatedError(LDIFParseError):
"""LDIF appears to be truncated"""
pass
HEADER = 'HEADER'
WAIT_FOR_DN = 'WAIT_FOR_DN'
IN_ENTRY = 'IN_ENTRY'
[docs]class LDIF(object, basic.LineReceiver):
delimiter='\n'
mode = HEADER
dn = None
data = None
lastLine = None
version = None
[docs] def logicalLineReceived(self, line):
if line.startswith('#'):
# comments are allowed everywhere
return
getattr(self, 'state_' + self.mode)(line)
[docs] def lineReceived(self, line):
if line.startswith(' '):
if self.lastLine is None:
raise LDIFEntryStartsWithSpaceError
self.lastLine = self.lastLine + line[1:]
else:
if self.lastLine is not None:
self.logicalLineReceived(self.lastLine)
self.lastLine = line
if line == '':
self.logicalLineReceived(line)
self.lastLine = None
[docs] def parseValue(self, val):
if val.startswith(':'):
return base64.decodestring(val[1:].lstrip(' '))
elif val.startswith('<'):
raise NotImplementedError
else:
return val.lstrip(' ')
def _parseLine(self, line):
try:
key, val = line.split(':', 1)
except ValueError:
# unpack list of wrong size
# -> invalid input data
raise LDIFLineWithoutSemicolonError, line
val = self.parseValue(val)
return key, val
[docs] def state_WAIT_FOR_DN(self, line):
assert self.dn is None, 'self.dn must not be set when waiting for DN'
assert self.data is None, 'self.data must not be set when waiting for DN'
if line == '':
# too many empty lines, but be tolerant
return
key, val = self._parseLine(line)
if key.upper() != 'DN':
raise LDIFEntryStartsWithNonDNError, line
self.dn = val
self.data = {}
self.mode = IN_ENTRY
[docs] def state_IN_ENTRY(self, line):
assert self.dn is not None, 'self.dn must be set when in entry'
assert self.data is not None, 'self.data must be set when in entry'
if line == '':
# end of entry
self.mode = WAIT_FOR_DN
o = entry.BaseLDAPEntry(dn=self.dn,
attributes=self.data)
self.dn = None
self.data = None
self.gotEntry(o)
return
key, val = self._parseLine(line)
if not key in self.data:
self.data[key] = []
self.data[key].append(val)
[docs] def gotEntry(self, obj):
pass
[docs] def connectionLost(self, reason=protocol.connectionDone):
if self.mode != WAIT_FOR_DN:
raise LDIFTruncatedError, reason