Source code for ldaptor.protocols.ldap.ldifdelta
from twisted.python.failure import Failure
from twisted.internet import error
from ldaptor.protocols.ldap import ldifprotocol
from ldaptor import delta, entry
WAIT_FOR_CHANGETYPE = b"WAIT_FOR_CHANGETYPE"
WAIT_FOR_MOD_SPEC = b"WAIT_FOR_MOD_SPEC"
IN_MOD_SPEC = b"IN_MOD_SPEC"
IN_ADD_ENTRY = b"IN_ADD_ENTRY"
IN_DELETE = b"IN_DELETE"
[docs]class LDIFDeltaMissingChangeTypeError(ldifprotocol.LDIFParseError):
"""LDIF delta entry has no changetype."""
[docs]class LDIFDeltaUnknownChangeTypeError(ldifprotocol.LDIFParseError):
"""LDIF delta entry has an unknown changetype."""
[docs]class LDIFDeltaUnknownModificationError(ldifprotocol.LDIFParseError):
"""LDIF delta modification has unknown mod-spec."""
[docs]class LDIFDeltaModificationMissingEndDashError(ldifprotocol.LDIFParseError):
"""LDIF delta modification has no ending dash."""
[docs]class LDIFDeltaModificationDifferentAttributeTypeError(ldifprotocol.LDIFParseError):
"""The attribute type for the change is not the as in the mod-spec header line."""
[docs]class LDIFDeltaAddMissingAttributesError(ldifprotocol.LDIFParseError):
"""Add operation needs to have at least one attribute type and value."""
[docs]class LDIFDeltaDeleteHasJunkAfterChangeTypeError(ldifprotocol.LDIFParseError):
"""Delete operation takes no attribute types or values."""
[docs]class LDIFDelta(ldifprotocol.LDIF):
[docs] def state_WAIT_FOR_DN(self, line):
super().state_WAIT_FOR_DN(line)
if self.mode == ldifprotocol.IN_ENTRY:
self.mode = WAIT_FOR_CHANGETYPE
[docs] def state_WAIT_FOR_CHANGETYPE(self, line):
assert self.dn is not None, "self.dn must be set when in entry"
assert self.data is not None, "self.data must be set when in entry"
if line == b"":
raise LDIFDeltaMissingChangeTypeError(self.dn)
key, val = self._parseLine(line)
if key != b"changetype":
raise LDIFDeltaMissingChangeTypeError(self.dn, key, val)
if val == b"modify":
self.modifications = []
self.mode = WAIT_FOR_MOD_SPEC
elif val == b"add":
self.mode = IN_ADD_ENTRY
elif val == b"delete":
self.mode = IN_DELETE
elif val == b"modrdn" or val == b"moddn":
raise NotImplementedError() # TODO
else:
raise LDIFDeltaUnknownChangeTypeError()
MOD_SPEC_TO_DELTA = {
b"add": delta.Add,
b"delete": delta.Delete,
b"replace": delta.Replace,
}
[docs] def state_WAIT_FOR_MOD_SPEC(self, line):
if line == b"":
# end of entry
self.mode = ldifprotocol.WAIT_FOR_DN
m = delta.ModifyOp(dn=self.dn, modifications=self.modifications)
self.dn = None
self.data = None
self.modifications = None
self.gotEntry(m)
return
key, val = self._parseLine(line)
if key not in self.MOD_SPEC_TO_DELTA:
raise LDIFDeltaUnknownModificationError(self.dn, key)
self.mod_spec = key
self.mod_spec_attr = val
self.mod_spec_data = []
self.mode = IN_MOD_SPEC
[docs] def state_IN_MOD_SPEC(self, line):
if line == b"":
raise LDIFDeltaModificationMissingEndDashError()
if line == b"-":
mod = self.MOD_SPEC_TO_DELTA[self.mod_spec]
de = mod(self.mod_spec_attr, self.mod_spec_data)
self.modifications.append(de)
del self.mod_spec
del self.mod_spec_attr
del self.mod_spec_data
self.mode = WAIT_FOR_MOD_SPEC
return
key, val = self._parseLine(line)
if key != self.mod_spec_attr:
raise LDIFDeltaModificationDifferentAttributeTypeError(
key, self.mod_spec_attr
)
self.mod_spec_data.append(val)
[docs] def state_IN_ADD_ENTRY(self, line):
assert self.dn is not None, "self.dn must be set when in entry"
assert self.data is not None, "self.data must be set when in entry"
if line == b"":
# end of entry
if not self.data:
raise LDIFDeltaAddMissingAttributesError(self.dn)
self.mode = ldifprotocol.WAIT_FOR_DN
o = delta.AddOp(entry.BaseLDAPEntry(dn=self.dn, attributes=self.data))
self.dn = None
self.data = None
self.gotEntry(o)
return
key, val = self._parseLine(line)
if not key in self.data:
self.data[key] = []
self.data[key].append(val)
[docs] def state_IN_DELETE(self, line):
assert self.dn is not None, "self.dn must be set when in entry"
if line == b"":
# end of entry
self.mode = ldifprotocol.WAIT_FOR_DN
o = delta.DeleteOp(dn=self.dn)
self.dn = None
self.data = None
self.gotEntry(o)
return
raise LDIFDeltaDeleteHasJunkAfterChangeTypeError(self.dn, line)
[docs]def fromLDIFFile(f):
"""Read LDIF data from a file."""
p = LDIFDelta()
l = []
p.gotEntry = l.append
while 1:
data = f.read()
if not data:
break
p.dataReceived(data)
p.connectionLost(Failure(error.ConnectionDone()))
return l