-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
406 lines (343 loc) · 16.6 KB
/
main.py
File metadata and controls
406 lines (343 loc) · 16.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
"""
GCRY Prototype — versão "insana"
Extensões adicionadas ("mais insano"):
- suporte a CIPHER: AES-256-GCM (padrão) ou XChaCha20-Poly1305 (se disponível)
- compressão opcional (gzip) antes da cifragem
- suporte a múltiplos destinatários (wrap da file_key com chaves RSA públicas)
- suporte a proteção por senha (KEK via PBKDF2) simultaneamente com recipients
- assinatura Ed25519 do header + metadados (opcional)
- ofuscação do campo `filename` (encryptado com a file_key)
- CLI com flags: --cipher, --recipients, --sign-key, --compress, --obfuscate
Aviso: ainda é um protótipo educacional. Misturar muitas features aumenta a complexidade.
Dependências: pip install cryptography
Uso:
python gcry_prototype.py encrypt in.bin out.gcry --password 'senha' --cipher chacha --recipients "bob.pub.pem,alice.pub.pem" --sign-key me_priv.pem --compress --obfuscate
python gcry_prototype.py decrypt in.gcry out.bin --password 'senha' --priv-keys "bob_priv.pem,alice_priv.pem" --verify-key me_pub.pem
"""
from __future__ import annotations
import os
import json
import base64
import struct
import argparse
import gzip
from typing import Dict, List, Optional
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.asymmetric import padding as asym_padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import ed25519
# Try import XChaCha20Poly1305 (may not be available in older cryptography)
try:
from cryptography.hazmat.primitives.ciphers.aead import XChaCha20Poly1305
HAVE_XCHACHA = True
except Exception:
XChaCha20Poly1305 = None
HAVE_XCHACHA = False
MAGIC = b'GCRYFILE' # 8 bytes
CHUNK_SIZE = 65536
KDF_ITERS_DEFAULT = 200_000
def b64(x: bytes) -> str:
return base64.b64encode(x).decode('ascii')
def ub64(s: str) -> bytes:
return base64.b64decode(s)
def int_to_4be(i: int) -> bytes:
return struct.pack('>I', i)
# KDF
def derive_kek(password: bytes, salt: bytes, iterations: int = KDF_ITERS_DEFAULT) -> bytes:
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=iterations)
return kdf.derive(password)
# RSA wrap/unwrap
from cryptography.hazmat.primitives.asymmetric.padding import OAEP, MGF1
from cryptography.hazmat.primitives import hashes as chashes
from cryptography.hazmat.primitives.serialization import load_pem_public_key, load_pem_private_key
# Helpers for loading keys
def load_rsa_pub(path: str):
with open(path, 'rb') as f:
return load_pem_public_key(f.read())
def load_rsa_priv(path: str, password: Optional[bytes] = None):
with open(path, 'rb') as f:
return load_pem_private_key(f.read(), password=password)
# Ed25519 load
def load_ed25519_priv(path: str, password: Optional[bytes] = None):
with open(path, 'rb') as f:
return load_pem_private_key(f.read(), password=password)
def load_ed25519_pub(path: str):
with open(path, 'rb') as f:
return serialization.load_pem_public_key(f.read())
# Header helpers
def write_header(fout, header: Dict):
header_bytes = json.dumps(header, separators=(',',':')).encode('utf-8')
fout.write(MAGIC)
fout.write(struct.pack('>I', len(header_bytes)))
fout.write(header_bytes)
def read_header(fin):
magic = fin.read(len(MAGIC))
if magic != MAGIC:
raise ValueError('Not a GCRY file (bad magic)')
raw = fin.read(4)
if len(raw) < 4:
raise ValueError('Truncated file (no header length)')
(hlen,) = struct.unpack('>I', raw)
header_bytes = fin.read(hlen)
if len(header_bytes) < hlen:
raise ValueError('Truncated header')
header = json.loads(header_bytes.decode('utf-8'))
return header, header_bytes
def make_header_stub(header_dict: Dict) -> bytes:
stub = dict(header_dict)
# Remove fields that will be added later or are sensitive
stub.pop('file_key_wrapped', None)
stub.pop('wrap_nonce', None)
stub.pop('recipients', None)
stub.pop('signature', None)
return json.dumps(stub, separators=(',',':')).encode('utf-8')
# choice of cipher
class CipherWrapper:
def __init__(self, name: str, key: bytes):
self.name = name
self.key = key
if name == 'aes':
self.impl = AESGCM(key)
elif name == 'chacha':
if not HAVE_XCHACHA:
raise ValueError('XChaCha20Poly1305 not available in this environment')
self.impl = XChaCha20Poly1305(key)
else:
raise ValueError('Unknown cipher')
def encrypt(self, nonce: bytes, data: bytes, aad: bytes) -> bytes:
return self.impl.encrypt(nonce, data, aad)
def decrypt(self, nonce: bytes, data: bytes, aad: bytes) -> bytes:
return self.impl.decrypt(nonce, data, aad)
# Encrypt file
def encrypt_file(inpath: str, outpath: str, password: Optional[str], *, cipher_name: str = 'aes', recipients: Optional[List[str]] = None, sign_key: Optional[str] = None, compress: bool = False, obfuscate: bool = False, chunk_size: int = CHUNK_SIZE):
password_b = password.encode('utf-8') if password is not None else None
file_key = os.urandom(32) # symmetric key per file
kek_salt = os.urandom(16) if password_b is not None else None
kek_iters = KDF_ITERS_DEFAULT
kek = derive_kek(password_b, kek_salt, iterations=kek_iters) if password_b is not None else None
wrap_nonce = os.urandom(12) if password_b is not None else None
payload_nonce_base = os.urandom(8)
filesize = os.path.getsize(inpath)
filename = os.path.basename(inpath)
# handle compression: stream input through gzip in-memory chunking is tricky; we'll compress whole file for prototype
if compress:
with open(inpath,'rb') as f:
orig_data = f.read()
compressed = gzip.compress(orig_data)
tmp_inpath = inpath + '.gcry_tmp'
with open(tmp_inpath, 'wb') as f:
f.write(compressed)
filesize = len(compressed)
inpath_to_use = tmp_inpath
else:
inpath_to_use = inpath
# recipients wrapping
recipients_meta = []
if recipients:
for r in recipients:
pub = load_rsa_pub(r)
# wrap file_key with RSA-OAEP
wrapped = pub.encrypt(file_key, OAEP(mgf=MGF1(algorithm=chashes.SHA256()), algorithm=chashes.SHA256(), label=None))
recipients_meta.append({'recipient': os.path.basename(r), 'wrapped_key': b64(wrapped), 'wrap_algo':'RSA-OAEP-SHA256'})
# initial header (without wrapped key)
header = {
'version': 2,
'alg': cipher_name.upper() + ('-GCM' if cipher_name=='aes' else '-XCHACHA20-POLY1305'),
'chunk_size': chunk_size,
'file_size': filesize,
'filename': filename if not obfuscate else None,
'kek_kdf': 'PBKDF2-HMAC-SHA256' if password_b is not None else None,
'kek_salt': b64(kek_salt) if kek_salt is not None else None,
'kek_iters': kek_iters if kek_salt is not None else None,
'payload_nonce_base': b64(payload_nonce_base),
'recipients': recipients_meta if recipients_meta else None,
'compress': bool(compress),
'obfuscated_filename': None,
'comment': 'insane-mode'
}
header_stub_bytes = make_header_stub(header)
# wrap file_key with KEK if password provided, authenticate with header_stub
wrapped = None
if kek is not None:
aes_kek = AESGCM(kek)
wrapped = aes_kek.encrypt(wrap_nonce, file_key, header_stub_bytes)
header['file_key_wrapped'] = b64(wrapped)
header['wrap_nonce'] = b64(wrap_nonce)
# if obfuscate filename: encrypt filename with file_key via AESGCM (new nonce)
if obfuscate and header.get('filename') is not None:
aes_file_tmp = AESGCM(file_key)
fn_nonce = os.urandom(12)
enc_fn = aes_file_tmp.encrypt(fn_nonce, header['filename'].encode('utf-8'), header_stub_bytes)
header['obfuscated_filename'] = {'enc': b64(enc_fn), 'nonce': b64(fn_nonce)}
header['filename'] = None
# possibly add recipients info already in header
header_bytes = json.dumps(header, separators=(',',':')).encode('utf-8')
# signature (Ed25519) over header_bytes
signature_b64 = None
if sign_key:
# load private key (could be RSA or Ed25519; we prefer Ed25519 here)
priv = load_ed25519_priv(sign_key)
if not isinstance(priv, ed25519.Ed25519PrivateKey):
raise ValueError('sign-key must be Ed25519 private key PEM')
sig = priv.sign(header_bytes)
header['signature'] = b64(sig)
signature_b64 = b64(sig)
header_bytes = json.dumps(header, separators=(',',':')).encode('utf-8')
with open(inpath_to_use, 'rb') as fin, open(outpath, 'wb') as fout:
write_header(fout, header)
# choose cipher wrapper
cipher = CipherWrapper('aes' if cipher_name=='aes' else 'chacha', file_key)
i = 0
while True:
chunk = fin.read(chunk_size)
if not chunk:
break
# build nonce
if cipher_name == 'aes':
nonce = payload_nonce_base + int_to_4be(i)
else:
# XChaCha requires 24-byte nonce: use payload_nonce_base (8) + counter (4) + zeros(12)
# Simpler: derive a 24-byte nonce: payload_nonce_base(8) || counter(4) || random(12)
# But randomness per chunk risks non-determinism; instead produce deterministic 24 bytes
nonce = payload_nonce_base + int_to_4be(i) + b'\x00'*12
cipher_chunk = cipher.encrypt(nonce, chunk, header_bytes)
fout.write(cipher_chunk)
i += 1
# cleanup temp compressed file
if compress:
try:
os.remove(tmp_inpath)
except Exception:
pass
print(f'Encrypted "{inpath}" -> "{outpath}" ({filesize} bytes, {i} chunks)')
if recipients_meta:
print(f'Wrapped for recipients: {",".join([r["recipient"] for r in recipients_meta])}')
if signature_b64:
print('Signed header (Ed25519)')
# Decrypt
def try_unwrap_recipients(recipients_meta: List[Dict], priv_key_paths: List[str]) -> Optional[bytes]:
# try each provided private key to unwrap one of recipients entries
for p in priv_key_paths:
try:
priv = load_rsa_priv(p)
except Exception:
continue
for r in recipients_meta:
try:
wrapped = ub64(r['wrapped_key'])
file_key = priv.decrypt(wrapped, OAEP(mgf=MGF1(algorithm=chashes.SHA256()), algorithm=chashes.SHA256(), label=None))
return file_key
except Exception:
continue
return None
def decrypt_file(inpath: str, outpath: str, password: Optional[str], *, cipher_pref: str = 'aes', priv_keys: Optional[List[str]] = None, verify_key: Optional[str] = None):
password_b = password.encode('utf-8') if password is not None else None
with open(inpath, 'rb') as fin:
header, header_bytes = read_header(fin)
header_stub_bytes = make_header_stub(header)
# attempt to recover file_key: 1) recipients via private keys 2) kek via password
file_key = None
if header.get('recipients') and priv_keys:
candidate = try_unwrap_recipients(header['recipients'], priv_keys)
if candidate:
file_key = candidate
if file_key is None and header.get('file_key_wrapped') and password_b is not None:
kek_salt = ub64(header['kek_salt'])
kek_iters = int(header.get('kek_iters', KDF_ITERS_DEFAULT))
kek = derive_kek(password_b, kek_salt, iterations=kek_iters)
wrap_nonce = ub64(header['wrap_nonce'])
wrapped = ub64(header['file_key_wrapped'])
aes_kek = AESGCM(kek)
try:
file_key = aes_kek.decrypt(wrap_nonce, wrapped, header_stub_bytes)
except Exception as e:
raise ValueError('Failed to unwrap file key. Wrong password or corrupted header.') from e
if file_key is None:
raise ValueError('Could not recover file key (no valid private key or password)')
# verify signature if present and verify_key provided
if header.get('signature') and verify_key:
pub = load_ed25519_pub(verify_key)
sig = ub64(header['signature'])
try:
pub.verify(sig, header_bytes)
print('Header signature OK')
except Exception:
raise ValueError('Signature verification failed')
payload_nonce_base = ub64(header['payload_nonce_base'])
chunk_size = int(header.get('chunk_size', CHUNK_SIZE))
cipher = CipherWrapper('aes' if header['alg'].lower().startswith('aes') else 'chacha', file_key)
with open(outpath, 'wb') as fout:
i = 0
while True:
to_read = chunk_size + 64 # generous read to capture tag or larger chacha overhead
data = fin.read(to_read)
if not data:
break
if header['alg'].lower().startswith('aes'):
nonce = payload_nonce_base + int_to_4be(i)
else:
nonce = payload_nonce_base + int_to_4be(i) + b'\x00'*12
try:
plain = cipher.decrypt(nonce, data, header_bytes)
except Exception as e:
raise ValueError(f'Failed to decrypt chunk {i}: corrupted or tampered file') from e
fout.write(plain)
i += 1
# if filename was obfuscated, try to decrypt it and display
if header.get('obfuscated_filename'):
enc = ub64(header['obfuscated_filename']['enc'])
fn_nonce = ub64(header['obfuscated_filename']['nonce'])
try:
fn = AESGCM(file_key).decrypt(fn_nonce, enc, header_stub_bytes).decode('utf-8')
print('Recovered filename:', fn)
except Exception:
pass
print(f'Decrypted "{inpath}" -> "{outpath}" ({i} chunks)')
# Info
def info_file(path: str):
with open(path, 'rb') as fin:
header, header_bytes = read_header(fin)
h = dict(header)
h.pop('file_key_wrapped', None)
h.pop('wrap_nonce', None)
print(json.dumps(h, indent=2))
# CLI
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='GCRY protótipo INSANO')
sub = parser.add_subparsers(dest='cmd')
p_enc = sub.add_parser('encrypt')
p_enc.add_argument('infile')
p_enc.add_argument('outfile')
p_enc.add_argument('--password', '-p', required=False)
p_enc.add_argument('--cipher', choices=['aes','chacha'], default='aes')
p_enc.add_argument('--recipients', help='comma-separated rsa pub pem paths', required=False)
p_enc.add_argument('--sign-key', help='ed25519 private key pem', required=False)
p_enc.add_argument('--compress', action='store_true')
p_enc.add_argument('--obfuscate', action='store_true')
p_dec = sub.add_parser('decrypt')
p_dec.add_argument('infile')
p_dec.add_argument('outfile')
p_dec.add_argument('--password', '-p', required=False)
p_dec.add_argument('--priv-keys', help='comma-separated rsa priv pem paths', required=False)
p_dec.add_argument('--verify-key', help='ed25519 public key pem', required=False)
p_info = sub.add_parser('info')
p_info.add_argument('file')
args = parser.parse_args()
if args.cmd == 'encrypt':
recs = args.recipients.split(',') if args.recipients else None
pwd = args.password if args.password is not None else input('Password (or blank to skip): ')
if pwd == '':
pwd = None
encrypt_file(args.infile, args.outfile, pwd, cipher_name=args.cipher, recipients=recs, sign_key=args.sign_key, compress=args.compress, obfuscate=args.obfuscate)
elif args.cmd == 'decrypt':
privs = args.priv_keys.split(',') if args.priv_keys else None
pwd = args.password if args.password is not None else input('Password (or blank to skip): ')
if pwd == '':
pwd = None
decrypt_file(args.infile, args.outfile, pwd, cipher_pref='aes', priv_keys=privs, verify_key=args.verify_key)
elif args.cmd == 'info':
info_file(args.file)
else:
parser.print_help()