summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Matveev <stargrave@stargrave.org>2020-02-15 14:51:09 +0300
committerSergey Matveev <stargrave@stargrave.org>2020-02-16 21:25:49 +0300
commite83a585f513b694f5f953fab15e5419e837dda7b (patch)
tree4d7d1fa4275113aa5f4e8a5c7b4cd21738ab04da
parente9ab3a6d6679d825d0e48523cec13ff710f0220d (diff)
downloadpyderasn-e83a585f513b694f5f953fab15e5419e837dda7b.tar.xz
agg_octet_string
-rw-r--r--doc/news.rst3
-rwxr-xr-xpyderasn.py24
-rw-r--r--tests/test_cms.py98
3 files changed, 99 insertions, 26 deletions
diff --git a/doc/news.rst b/doc/news.rst
index 5665a42..ec2cf02 100644
--- a/doc/news.rst
+++ b/doc/news.rst
@@ -17,6 +17,9 @@ News
where no in-memory objects storing happens, giving ability to process
ASN.1 data without fully parsing it first. ``python -m pyderasn`` has
``--evgen`` mode switcher
+* Useful ``agg_octet_string`` that is able to streamingly decode string
+ from events of ``evgen_mode``, allowing strings retrieving without
+ copying them to memory first
* Initial experimental CER encoding mode, allowing streaming encoding of
the data directly to some writeable object
* Ability to use mmap-ed memoryviews to skip files loading to memory
diff --git a/pyderasn.py b/pyderasn.py
index fe1d4ab..8d88a3b 100755
--- a/pyderasn.py
+++ b/pyderasn.py
@@ -816,6 +816,7 @@ except ImportError: # pragma: no cover
__version__ = "7.0"
__all__ = (
+ "agg_octet_string",
"Any",
"BitString",
"BMPString",
@@ -3554,6 +3555,29 @@ class OctetString(Obj):
yield pp
+def agg_octet_string(evgens, decode_path, raw, writer):
+ """Aggregate constructed string (OctetString and its derivatives)
+
+ :param evgens: iterator of generated events
+ :param decode_path: points to the string we want to decode
+ :param raw: slicebable (memoryview, bytearray, etc) with
+ the data evgens are generated one
+ :param writer: buffer.write where string is going to be saved
+ """
+ decode_path_len = len(decode_path)
+ for dp, obj, _ in evgens:
+ if dp[:decode_path_len] != decode_path:
+ continue
+ if not obj.ber_encoded:
+ write_full(writer, raw[
+ obj.offset + obj.tlen + obj.llen:
+ obj.offset + obj.tlen + obj.llen + obj.vlen -
+ (EOC_LEN if obj.expl_lenindef else 0)
+ ])
+ if len(dp) == decode_path_len:
+ break
+
+
NullState = namedtuple("NullState", BasicState._fields, **NAMEDTUPLE_KWARGS)
diff --git a/tests/test_cms.py b/tests/test_cms.py
index 86df541..359b6bb 100644
--- a/tests/test_cms.py
+++ b/tests/test_cms.py
@@ -15,16 +15,26 @@
# License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
-from datetime import datetime
from hashlib import sha256
+from io import BytesIO
+from io import open as io_open
+from os import remove
from os import urandom
-from random import randint
from subprocess import call
+from tempfile import NamedTemporaryFile
+from unittest import skipIf
from unittest import TestCase
+from hypothesis import given
+from hypothesis import settings
+from hypothesis.strategies import integers
+from six import PY2
+
+from pyderasn import agg_octet_string
from pyderasn import Any
from pyderasn import Choice
from pyderasn import encode_cer
+from pyderasn import file_mmaped
from pyderasn import Integer
from pyderasn import ObjectIdentifier
from pyderasn import OctetString
@@ -32,11 +42,9 @@ from pyderasn import Sequence
from pyderasn import SetOf
from pyderasn import tag_ctxc
from pyderasn import tag_ctxp
-from pyderasn import UTCTime
from tests.test_crts import AlgorithmIdentifier
from tests.test_crts import Certificate
from tests.test_crts import SubjectKeyIdentifier
-from tests.test_crts import Time
class CMSVersion(Integer):
@@ -158,13 +166,39 @@ id_pkcs9_at_contentType = ObjectIdentifier("1.2.840.113549.1.9.3")
id_pkcs9_at_messageDigest = ObjectIdentifier("1.2.840.113549.1.9.4")
id_ce_subjectKeyIdentifier = ObjectIdentifier("2.5.29.14")
-
-class TestSignedDataCER(TestCase):
- def runTest(self):
- # openssl ecparam -name prime256v1 -genkey -out key.pem
- # openssl req -x509 -new -key key.pem -outform PEM -out cert.pem
- # -days 365 -nodes -subj "/CN=doesnotmatter"
- with open("cert.cer", "rb") as fd:
+openssl_cms_exists = call("openssl cms -help 2>/dev/null", shell=True) == 0
+
+@skipIf(not openssl_cms_exists, "openssl cms command not found")
+class TestSignedDataCERWithOpenSSL(TestCase):
+ @settings(deadline=None)
+ @given(integers(min_value=1000, max_value=5000))
+ def runTest(self, data_len):
+ def tmpfile():
+ tmp = NamedTemporaryFile(delete=False)
+ tmp.close()
+ tmp = tmp.name
+ self.addCleanup(lambda: remove(tmp))
+ return tmp
+ key_path = tmpfile()
+ self.assertEqual(0, call(
+ "openssl ecparam -name prime256v1 -genkey -out " + key_path,
+ shell=True,
+ ))
+ cert_path = tmpfile()
+ self.assertEqual(0, call(" ".join((
+ "openssl req -x509 -new",
+ ("-key " + key_path),
+ ("-outform PEM -out " + cert_path),
+ "-nodes -subj /CN=pyderasntest",
+ )), shell=True))
+ cert_der_path = tmpfile()
+ self.assertEqual(0, call(" ".join((
+ "openssl x509",
+ "-inform PEM -in " + cert_path,
+ "-outform DER -out " + cert_der_path,
+ )), shell=True))
+ self.assertEqual(0, call("cat %s >> %s" % (key_path, cert_path), shell=True))
+ with open(cert_der_path, "rb") as fd:
cert = Certificate().decod(fd.read())
for ext in cert["tbsCertificate"]["extensions"]:
if ext["extnID"] == id_ce_subjectKeyIdentifier:
@@ -172,7 +206,7 @@ class TestSignedDataCER(TestCase):
ai_sha256 = AlgorithmIdentifier((
("algorithm", id_sha256),
))
- data = urandom(randint(1000, 3000))
+ data = urandom(data_len)
eci = EncapsulatedContentInfo((
("eContentType", ContentType(id_data)),
("eContent", OctetString(data)),
@@ -193,15 +227,18 @@ class TestSignedDataCER(TestCase):
])),
)),
])
- with open("/tmp/in", "wb") as fd:
+ input_path = tmpfile()
+ with open(input_path, "wb") as fd:
fd.write(encode_cer(signed_attrs))
+ signature_path = tmpfile()
self.assertEqual(0, call(" ".join((
"openssl dgst -sha256",
- "-sign key.pem",
- "-binary",
- "/tmp/in",
- "> /tmp/signature",
+ ("-sign " + key_path),
+ "-binary", input_path,
+ ("> " + signature_path),
)), shell=True))
+ with open(signature_path, "rb") as fd:
+ signature = fd.read()
ci = ContentInfo((
("contentType", ContentType(id_signedData)),
("content", Any((SignedData((
@@ -213,23 +250,32 @@ class TestSignedDataCER(TestCase):
])),
("signerInfos", SignerInfos([SignerInfo((
("version", CMSVersion("v3")),
- ("sid", SignerIdentifier(
- ("subjectKeyIdentifier", skid)
- )),
+ ("sid", SignerIdentifier(("subjectKeyIdentifier", skid))),
("digestAlgorithm", DigestAlgorithmIdentifier(ai_sha256)),
("signedAttrs", signed_attrs),
("signatureAlgorithm", SignatureAlgorithmIdentifier((
("algorithm", id_ecdsa_with_SHA256),
))),
- ("signature", SignatureValue(open("/tmp/signature", "rb").read())),
+ ("signature", SignatureValue(signature)),
))])),
))))),
))
- with open("/tmp/out.p7m", "wb") as fd:
- fd.write(encode_cer(ci))
+ output_path = tmpfile()
+ with io_open(output_path, "wb") as fd:
+ ci.encode_cer(writer=fd.write)
self.assertEqual(0, call(" ".join((
"openssl cms -verify",
- "-inform DER -in /tmp/out.p7m",
- "-signer cert.pem -CAfile cert.pem",
- "-out /dev/null",
+ ("-inform DER -in " + output_path),
+ "-signer %s -CAfile %s" % (cert_path, cert_path),
+ "-out /dev/null 2>/dev/null",
)), shell=True))
+ fd = open(output_path, "rb")
+ raw = memoryview(fd.read()) if PY2 else file_mmaped(fd)
+ ctx = {"bered": True}
+ for decode_path, obj, _ in ContentInfo().decode_evgen(raw, ctx=ctx):
+ if decode_path == ("content",):
+ break
+ evgens = SignedData().decode_evgen(raw[obj.offset:], offset=obj.offset, ctx=ctx)
+ buf = BytesIO()
+ agg_octet_string(evgens, ("encapContentInfo", "eContent"), raw, buf.write)
+ self.assertSequenceEqual(buf.getvalue(), data)