summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Matveev <stargrave@stargrave.org>2020-02-10 19:41:57 +0300
committerSergey Matveev <stargrave@stargrave.org>2020-02-11 13:49:46 +0300
commit27151db6d1a8da5a9e8962eb1a7df710524ab3d8 (patch)
treef9521b0d3429330a265c58f2bae778a25dce20df
parentf7a92debecda42633ba9a7c320eaaa81871a4d49 (diff)
downloadpyderasn-27151db6d1a8da5a9e8962eb1a7df710524ab3d8.tar.xz
Initial support for event generated mode
-rwxr-xr-xpyderasn.py513
1 files changed, 355 insertions, 158 deletions
diff --git a/pyderasn.py b/pyderasn.py
index fcaa5a6..7c8e64d 100755
--- a/pyderasn.py
+++ b/pyderasn.py
@@ -781,6 +781,19 @@ def fractions2float(fractions_raw):
return float("0." + fractions_raw)
+def get_def_by_path(defines_by_path, sub_decode_path):
+ """Get define by decode path
+ """
+ for path, define in defines_by_path:
+ if len(path) != len(sub_decode_path):
+ continue
+ for p1, p2 in zip(path, sub_decode_path):
+ if (not p1 is any) and (p1 != p2):
+ break
+ else:
+ return define
+
+
########################################################################
# Errors
########################################################################
@@ -1184,8 +1197,8 @@ class Obj(object):
def _encode(self): # pragma: no cover
raise NotImplementedError()
- def _decode(self, tlv, offset, decode_path, ctx, tag_only): # pragma: no cover
- raise NotImplementedError()
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode): # pragma: no cover
+ yield NotImplemented
def encode(self):
"""Encode the structure
@@ -1212,6 +1225,32 @@ class Obj(object):
tag_only=False,
_ctx_immutable=True,
):
+ result = next(self.decode_evgen(
+ data,
+ offset,
+ leavemm,
+ decode_path,
+ ctx,
+ tag_only,
+ _ctx_immutable,
+ _evgen_mode=False,
+ ))
+ if result is None:
+ return None
+ _, obj, tail = result
+ return obj, tail
+
+ def decode_evgen(
+ self,
+ data,
+ offset=0,
+ leavemm=False,
+ decode_path=(),
+ ctx=None,
+ tag_only=False,
+ _ctx_immutable=True,
+ _evgen_mode=True,
+ ):
"""Decode the data
:param data: either binary or memoryview
@@ -1233,17 +1272,26 @@ class Obj(object):
elif _ctx_immutable:
ctx = copy(ctx)
tlv = memoryview(data)
+ if (
+ _evgen_mode and
+ get_def_by_path(ctx.get("evgen_mode_upto", ()), decode_path)
+ ):
+ _evgen_mode=False
if self._expl is None:
- result = self._decode(
- tlv,
- offset,
- decode_path=decode_path,
- ctx=ctx,
- tag_only=tag_only,
- )
- if tag_only:
- return None
- obj, tail = result
+ for result in self._decode(
+ tlv,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx,
+ tag_only=tag_only,
+ evgen_mode=_evgen_mode,
+ ):
+ if tag_only:
+ yield None
+ return
+ _decode_path, obj, tail = result
+ if not _decode_path is decode_path:
+ yield result
else:
try:
t, tlen, lv = tag_strip(tlv)
@@ -1272,16 +1320,20 @@ class Obj(object):
)
llen, v = 1, lv[1:]
offset += tlen + llen
- result = self._decode(
- v,
- offset=offset,
- decode_path=decode_path,
- ctx=ctx,
- tag_only=tag_only,
- )
- if tag_only: # pragma: no cover
- return None
- obj, tail = result
+ for result in self._decode(
+ v,
+ offset=offset,
+ decode_path=decode_path,
+ ctx=ctx,
+ tag_only=tag_only,
+ evgen_mode=_evgen_mode,
+ ):
+ if tag_only: # pragma: no cover
+ yield None
+ return
+ _decode_path, obj, tail = result
+ if not _decode_path is decode_path:
+ yield result
eoc_expected, tail = tail[:EOC_LEN], tail[EOC_LEN:]
if eoc_expected.tobytes() != EOC:
raise DecodeError(
@@ -1307,16 +1359,20 @@ class Obj(object):
decode_path=decode_path,
offset=offset,
)
- result = self._decode(
- v,
- offset=offset + tlen + llen,
- decode_path=decode_path,
- ctx=ctx,
- tag_only=tag_only,
- )
- if tag_only: # pragma: no cover
- return None
- obj, tail = result
+ for result in self._decode(
+ v,
+ offset=offset + tlen + llen,
+ decode_path=decode_path,
+ ctx=ctx,
+ tag_only=tag_only,
+ evgen_mode=_evgen_mode,
+ ):
+ if tag_only: # pragma: no cover
+ yield None
+ return
+ _decode_path, obj, tail = result
+ if not _decode_path is decode_path:
+ yield result
if obj.tlvlen < l and not ctx.get("allow_expl_oob", False):
raise DecodeError(
"explicit tag out-of-bound, longer than data",
@@ -1324,7 +1380,7 @@ class Obj(object):
decode_path=decode_path,
offset=offset,
)
- return obj, (tail if leavemm else tail.tobytes())
+ yield decode_path, obj, (tail if leavemm else tail.tobytes())
def decod(self, data, offset=0, decode_path=(), ctx=None):
"""Decode the data, check that tail is empty
@@ -1885,7 +1941,7 @@ class Boolean(Obj):
(b"\xFF" if self._value else b"\x00"),
))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
@@ -1902,7 +1958,8 @@ class Boolean(Obj):
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
try:
l, _, v = len_decode(lv)
except DecodeError as err:
@@ -1951,7 +2008,7 @@ class Boolean(Obj):
_decoded=(offset, 1, 1),
)
obj.ber_encoded = ber_encoded
- return obj, v[1:]
+ yield decode_path, obj, v[1:]
def __repr__(self):
return pp_console_row(next(self.pps()))
@@ -2233,7 +2290,7 @@ class Integer(Obj):
break
return b"".join((self.tag, len_encode(len(octets)), octets))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
@@ -2250,7 +2307,8 @@ class Integer(Obj):
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
@@ -2321,7 +2379,7 @@ class Integer(Obj):
decode_path=decode_path,
offset=offset,
)
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return pp_console_row(next(self.pps()))
@@ -2631,7 +2689,7 @@ class BitString(Obj):
octets,
))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
@@ -2643,7 +2701,8 @@ class BitString(Obj):
)
if t == self.tag:
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
@@ -2690,8 +2749,9 @@ class BitString(Obj):
offset=offset,
)
v, tail = v[:l], v[l:]
+ bit_len = (len(v) - 1) * 8 - pad_size
obj = self.__class__(
- value=((len(v) - 1) * 8 - pad_size, v[1:].tobytes()),
+ value=None if evgen_mode else (bit_len, v[1:].tobytes()),
impl=self.tag,
expl=self._expl,
default=self.default,
@@ -2699,7 +2759,10 @@ class BitString(Obj):
_specs=self.specs,
_decoded=(offset, llen, l),
)
- return obj, tail
+ if evgen_mode:
+ obj._value = (bit_len, None)
+ yield decode_path, obj, tail
+ return
if t != self.tag_constructed:
raise TagMismatch(
klass=self.__class__,
@@ -2714,7 +2777,8 @@ class BitString(Obj):
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
lenindef = False
try:
l, llen, v = len_decode(lv)
@@ -2761,14 +2825,26 @@ class BitString(Obj):
)
sub_decode_path = decode_path + (str(len(chunks)),)
try:
- chunk, v_tail = BitString().decode(
- v,
- offset=sub_offset,
- decode_path=sub_decode_path,
- leavemm=True,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, chunk, v_tail in BitString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, chunk, v_tail
+ else:
+ _, chunk, v_tail = next(BitString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
except TagMismatch:
raise DecodeError(
"expected BitString encoded chunk",
@@ -2797,13 +2873,15 @@ class BitString(Obj):
decode_path=decode_path + (str(chunk_i),),
offset=chunk.offset,
)
- values.append(bytes(chunk))
+ if not evgen_mode:
+ values.append(bytes(chunk))
bit_len += chunk.bit_len
chunk_last = chunks[-1]
- values.append(bytes(chunk_last))
+ if not evgen_mode:
+ values.append(bytes(chunk_last))
bit_len += chunk_last.bit_len
obj = self.__class__(
- value=(bit_len, b"".join(values)),
+ value=None if evgen_mode else (bit_len, b"".join(values)),
impl=self.tag,
expl=self._expl,
default=self.default,
@@ -2811,9 +2889,11 @@ class BitString(Obj):
_specs=self.specs,
_decoded=(offset, llen, vlen + (EOC_LEN if lenindef else 0)),
)
+ if evgen_mode:
+ obj._value = (bit_len, None)
obj.lenindef = lenindef
obj.ber_encoded = True
- return obj, (v[EOC_LEN:] if lenindef else v)
+ yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
def __repr__(self):
return pp_console_row(next(self.pps()))
@@ -2906,6 +2986,7 @@ class OctetString(Obj):
__slots__ = ("tag_constructed", "_bound_min", "_bound_max", "defined")
tag_default = tag_encode(4)
asn1_type_name = "OCTET STRING"
+ evgen_mode_skip_value = True
def __init__(
self,
@@ -3054,7 +3135,7 @@ class OctetString(Obj):
self._value,
))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
@@ -3066,7 +3147,8 @@ class OctetString(Obj):
)
if t == self.tag:
if tag_only:
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
@@ -3084,9 +3166,19 @@ class OctetString(Obj):
offset=offset,
)
v, tail = v[:l], v[l:]
+ if evgen_mode and not self._bound_min <= len(v) <= self._bound_max:
+ raise DecodeError(
+ msg=str(BoundsError(self._bound_min, len(v), self._bound_max)),
+ klass=self.__class__,
+ decode_path=decode_path,
+ offset=offset,
+ )
try:
obj = self.__class__(
- value=v.tobytes(),
+ value=(
+ None if (evgen_mode and self.evgen_mode_skip_value)
+ else v.tobytes()
+ ),
bounds=(self._bound_min, self._bound_max),
impl=self.tag,
expl=self._expl,
@@ -3109,7 +3201,8 @@ class OctetString(Obj):
decode_path=decode_path,
offset=offset,
)
- return obj, tail
+ yield decode_path, obj, tail
+ return
if t != self.tag_constructed:
raise TagMismatch(
klass=self.__class__,
@@ -3124,7 +3217,8 @@ class OctetString(Obj):
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
lenindef = False
try:
l, llen, v = len_decode(lv)
@@ -3148,6 +3242,7 @@ class OctetString(Obj):
chunks = []
sub_offset = offset + tlen + llen
vlen = 0
+ payload_len = 0
while True:
if lenindef:
if v[:EOC_LEN].tobytes() == EOC:
@@ -3164,14 +3259,29 @@ class OctetString(Obj):
)
sub_decode_path = decode_path + (str(len(chunks)),)
try:
- chunk, v_tail = OctetString().decode(
- v,
- offset=sub_offset,
- decode_path=sub_decode_path,
- leavemm=True,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, chunk, v_tail in OctetString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, chunk, v_tail
+ if not chunk.ber_encoded:
+ payload_len += chunk.vlen
+ else:
+ _, chunk, v_tail = next(OctetString().decode_evgen(
+ v,
+ offset=sub_offset,
+ decode_path=sub_decode_path,
+ leavemm=True,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
+ chunks.append(chunk)
except TagMismatch:
raise DecodeError(
"expected OctetString encoded chunk",
@@ -3179,13 +3289,22 @@ class OctetString(Obj):
decode_path=sub_decode_path,
offset=sub_offset,
)
- chunks.append(chunk)
sub_offset += chunk.tlvlen
vlen += chunk.tlvlen
v = v_tail
+ if evgen_mode and not self._bound_min <= payload_len <= self._bound_max:
+ raise DecodeError(
+ msg=str(BoundsError(self._bound_min, payload_len, self._bound_max)),
+ klass=self.__class__,
+ decode_path=decode_path,
+ offset=offset,
+ )
try:
obj = self.__class__(
- value=b"".join(bytes(chunk) for chunk in chunks),
+ value=(
+ None if evgen_mode else
+ b"".join(bytes(chunk) for chunk in chunks)
+ ),
bounds=(self._bound_min, self._bound_max),
impl=self.tag,
expl=self._expl,
@@ -3210,7 +3329,7 @@ class OctetString(Obj):
)
obj.lenindef = lenindef
obj.ber_encoded = True
- return obj, (v[EOC_LEN:] if lenindef else v)
+ yield decode_path, obj, (v[EOC_LEN:] if lenindef else v)
def __repr__(self):
return pp_console_row(next(self.pps()))
@@ -3348,7 +3467,7 @@ class Null(Obj):
def _encode(self):
return self.tag + len_encode(0)
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
@@ -3365,7 +3484,8 @@ class Null(Obj):
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
try:
l, _, v = len_decode(lv)
except DecodeError as err:
@@ -3388,7 +3508,7 @@ class Null(Obj):
optional=self.optional,
_decoded=(offset, 1, 0),
)
- return obj, v
+ yield decode_path, obj, v
def __repr__(self):
return pp_console_row(next(self.pps()))
@@ -3632,7 +3752,7 @@ class ObjectIdentifier(Obj):
v = b"".join(octets)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, _, lv = tag_strip(tlv)
except DecodeError as err:
@@ -3649,7 +3769,8 @@ class ObjectIdentifier(Obj):
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
try:
l, llen, v = len_decode(lv)
except DecodeError as err:
@@ -3719,7 +3840,7 @@ class ObjectIdentifier(Obj):
)
if ber_encoded:
obj.ber_encoded = True
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return pp_console_row(next(self.pps()))
@@ -3886,6 +4007,7 @@ class CommonString(OctetString):
- utf-16-be
"""
__slots__ = ()
+ evgen_mode_skip_value = False
def _value_sanitize(self, value):
value_raw = None
@@ -4780,7 +4902,7 @@ class Choice(Obj):
self._assert_ready()
return self._value[1].encode()
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
for choice, spec in iteritems(self.specs):
sub_decode_path = decode_path + (choice,)
try:
@@ -4803,15 +4925,28 @@ class Choice(Obj):
offset=offset,
)
if tag_only: # pragma: no cover
- return None
- value, tail = spec.decode(
- tlv,
- offset=offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ yield None
+ return
+ if evgen_mode:
+ for _decode_path, value, tail in spec.decode_evgen(
+ tlv,
+ offset=offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, tail
+ else:
+ _, value, tail = next(spec.decode_evgen(
+ tlv,
+ offset=offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
obj = self.__class__(
schema=self.specs,
expl=self._expl,
@@ -4820,7 +4955,7 @@ class Choice(Obj):
_decoded=(offset, 0, value.fulllen),
)
obj._value = (choice, value)
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
value = pp_console_row(next(self.pps()))
@@ -5018,7 +5153,7 @@ class Any(Obj):
self._assert_ready()
return self._value
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
@@ -5055,14 +5190,15 @@ class Any(Obj):
chunk_i += 1
tlvlen = tlen + llen + vlen + EOC_LEN
obj = self.__class__(
- value=tlv[:tlvlen].tobytes(),
+ value=None if evgen_mode else tlv[:tlvlen].tobytes(),
expl=self._expl,
optional=self.optional,
_decoded=(offset, 0, tlvlen),
)
obj.lenindef = True
obj.tag = t.tobytes()
- return obj, v[EOC_LEN:]
+ yield decode_path, obj, v[EOC_LEN:]
+ return
except DecodeError as err:
raise err.__class__(
msg=err.msg,
@@ -5080,13 +5216,13 @@ class Any(Obj):
tlvlen = tlen + llen + l
v, tail = tlv[:tlvlen], v[l:]
obj = self.__class__(
- value=v.tobytes(),
+ value=None if evgen_mode else v.tobytes(),
expl=self._expl,
optional=self.optional,
_decoded=(offset, 0, tlvlen),
)
obj.tag = t.tobytes()
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return pp_console_row(next(self.pps()))
@@ -5434,7 +5570,7 @@ class Sequence(Obj):
v = b"".join(self._encoded_values())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
@@ -5451,7 +5587,8 @@ class Sequence(Obj):
offset=offset,
)
if tag_only: # pragma: no cover
- return None
+ yield None
+ return
lenindef = False
ctx_bered = ctx.get("bered", False)
try:
@@ -5495,21 +5632,33 @@ class Sequence(Obj):
continue
sub_decode_path = decode_path + (name,)
try:
- value, v_tail = spec.decode(
- v,
- sub_offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, value, v_tail in spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, v_tail
+ else:
+ _, value, v_tail = next(spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
except TagMismatch as err:
if (len(err.decode_path) == len(decode_path) + 1) and spec.optional:
continue
raise
defined = get_def_by_path(ctx.get("_defines", ()), sub_decode_path)
- if defined is not None:
+ if not evgen_mode and defined is not None:
defined_by, defined_spec = defined
if issubclass(value.__class__, SequenceOf):
for i, _value in enumerate(value):
@@ -5561,31 +5710,32 @@ class Sequence(Obj):
vlen += value_len
sub_offset += value_len
v = v_tail
- if spec.default is not None and value == spec.default:
- if ctx_bered or ctx_allow_default_values:
- ber_encoded = True
- else:
- raise DecodeError(
- "DEFAULT value met",
- klass=self.__class__,
- decode_path=sub_decode_path,
- offset=sub_offset,
- )
- values[name] = value
-
- spec_defines = getattr(spec, "defines", ())
- if len(spec_defines) == 0:
- defines_by_path = ctx.get("defines_by_path", ())
- if len(defines_by_path) > 0:
- spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
- if spec_defines is not None and len(spec_defines) > 0:
- for rel_path, schema in spec_defines:
- defined = schema.get(value, None)
- if defined is not None:
- ctx.setdefault("_defines", []).append((
- abs_decode_path(sub_decode_path[:-1], rel_path),
- (value, defined),
- ))
+ if not evgen_mode:
+ if spec.default is not None and value == spec.default:
+ # This will not work in evgen_mode
+ if ctx_bered or ctx_allow_default_values:
+ ber_encoded = True
+ else:
+ raise DecodeError(
+ "DEFAULT value met",
+ klass=self.__class__,
+ decode_path=sub_decode_path,
+ offset=sub_offset,
+ )
+ values[name] = value
+ spec_defines = getattr(spec, "defines", ())
+ if len(spec_defines) == 0:
+ defines_by_path = ctx.get("defines_by_path", ())
+ if len(defines_by_path) > 0:
+ spec_defines = get_def_by_path(defines_by_path, sub_decode_path)
+ if spec_defines is not None and len(spec_defines) > 0:
+ for rel_path, schema in spec_defines:
+ defined = schema.get(value, None)
+ if defined is not None:
+ ctx.setdefault("_defines", []).append((
+ abs_decode_path(sub_decode_path[:-1], rel_path),
+ (value, defined),
+ ))
if lenindef:
if v[:EOC_LEN].tobytes() != EOC:
raise DecodeError(
@@ -5614,7 +5764,7 @@ class Sequence(Obj):
obj._value = values
obj.lenindef = lenindef
obj.ber_encoded = ber_encoded
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
value = pp_console_row(next(self.pps()))
@@ -5684,7 +5834,7 @@ class Set(Sequence):
def _specs_items(self):
return iteritems(self.specs)
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
@@ -5701,7 +5851,8 @@ class Set(Sequence):
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
lenindef = False
ctx_bered = ctx.get("bered", False)
try:
@@ -5763,14 +5914,26 @@ class Set(Sequence):
decode_path=decode_path,
offset=offset,
)
- value, v_tail = spec.decode(
- v,
- sub_offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ if evgen_mode:
+ for _decode_path, value, v_tail in spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, v_tail
+ else:
+ _, value, v_tail = next(spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
value_len = value.fulllen
if value_prev.tobytes() > v[:value_len].tobytes():
if ctx_bered or ctx_allow_unordered_set:
@@ -5816,7 +5979,6 @@ class Set(Sequence):
)
tail = v[EOC_LEN:]
obj.lenindef = True
- obj._value = values
for name, spec in iteritems(self.specs):
if name not in values and not spec.optional:
raise DecodeError(
@@ -5825,8 +5987,10 @@ class Set(Sequence):
decode_path=decode_path,
offset=offset,
)
+ if not evgen_mode:
+ obj._value = values
obj.ber_encoded = ber_encoded
- return obj, tail
+ yield decode_path, obj, tail
SequenceOfState = namedtuple("SequenceOfState", (
@@ -6050,7 +6214,16 @@ class SequenceOf(Obj):
v = b"".join(self._encoded_values())
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only, ordering_check=False):
+ def _decode(
+ self,
+ tlv,
+ offset,
+ decode_path,
+ ctx,
+ tag_only,
+ evgen_mode,
+ ordering_check=False,
+ ):
try:
t, tlen, lv = tag_strip(tlv)
except DecodeError as err:
@@ -6067,7 +6240,8 @@ class SequenceOf(Obj):
offset=offset,
)
if tag_only:
- return None
+ yield None
+ return
lenindef = False
ctx_bered = ctx.get("bered", False)
try:
@@ -6101,6 +6275,7 @@ class SequenceOf(Obj):
vlen = 0
sub_offset = offset + tlen + llen
_value = []
+ _value_count = 0
ctx_allow_unordered_set = ctx.get("allow_unordered_set", False)
value_prev = memoryview(v[:0])
ber_encoded = False
@@ -6108,15 +6283,27 @@ class SequenceOf(Obj):
while len(v) > 0:
if lenindef and v[:EOC_LEN].tobytes() == EOC:
break
- sub_decode_path = decode_path + (str(len(_value)),)
- value, v_tail = spec.decode(
- v,
- sub_offset,
- leavemm=True,
- decode_path=sub_decode_path,
- ctx=ctx,
- _ctx_immutable=False,
- )
+ sub_decode_path = decode_path + (str(_value_count),)
+ if evgen_mode:
+ for _decode_path, value, v_tail in spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ ):
+ yield _decode_path, value, v_tail
+ else:
+ _, value, v_tail = next(spec.decode_evgen(
+ v,
+ sub_offset,
+ leavemm=True,
+ decode_path=sub_decode_path,
+ ctx=ctx,
+ _ctx_immutable=False,
+ _evgen_mode=False,
+ ))
value_len = value.fulllen
if ordering_check:
if value_prev.tobytes() > v[:value_len].tobytes():
@@ -6130,13 +6317,22 @@ class SequenceOf(Obj):
offset=sub_offset,
)
value_prev = v[:value_len]
- _value.append(value)
+ _value_count += 1
+ if not evgen_mode:
+ _value.append(value)
sub_offset += value_len
vlen += value_len
v = v_tail
+ if evgen_mode and not self._bound_min <= _value_count <= self._bound_max:
+ raise DecodeError(
+ msg=str(BoundsError(self._bound_min, _value_count, self._bound_max)),
+ klass=self.__class__,
+ decode_path=decode_path,
+ offset=offset,
+ )
try:
obj = self.__class__(
- value=_value,
+ value=None if evgen_mode else _value,
schema=spec,
bounds=(self._bound_min, self._bound_max),
impl=self.tag,
@@ -6163,7 +6359,7 @@ class SequenceOf(Obj):
obj.lenindef = True
tail = v[EOC_LEN:]
obj.ber_encoded = ber_encoded
- return obj, tail
+ yield decode_path, obj, tail
def __repr__(self):
return "%s[%s]" % (
@@ -6215,13 +6411,14 @@ class SetOf(SequenceOf):
v = b"".join(raws)
return b"".join((self.tag, len_encode(len(v)), v))
- def _decode(self, tlv, offset, decode_path, ctx, tag_only):
+ def _decode(self, tlv, offset, decode_path, ctx, tag_only, evgen_mode):
return super(SetOf, self)._decode(
tlv,
offset,
decode_path,
ctx,
tag_only,
+ evgen_mode,
ordering_check=True,
)