发布于  更新于 

jumpserver中资产的ssh私钥和密码的解密

JumpServer 是全球首款开源的堡垒机

渗透过程中在有数据库权限的情况下如何解密?

通过对源代码的阅读,记录如下

如何查看jumpserver版本

cat apps/jumpserver/const.py

资产表是assets_asset

获取SECRET_KEY

config.yaml中可获取到SECRET_KEY

资产的用户表

在源码apps/assets/models/user.py中,

AdminUserSystemUser两个model对应数据库中的表是assets_adminuserassets_systemuser

二者均继承子BaseUser,都有private_keypassword字段

这两个字段为EncryptCharField类型,其组合继承了EncryptMixin

简单来说这种类型的字段,在入库时会加密,出库时会解密

EncryptCharField的加密和解密

算法主要代码在apps/common/utils/encode.pyapps/common/utils/crypto.py

  1. signer: 早期版本仅通过签名算法加签
  2. crypto: 较新版本通过加密算法加密

signer和crypto均使用SECRET_KEY作为密钥

在2.14.1中,为了兼容早期版本会先通过crypto解密,再通过signer解密

signer解密

准确的说,signer只是签名,并不加密。其格式为{base64_method}.{base64_data}.{hash}

  1. base64_method: 使用的哈希算法解base64后为 {"alg":"HS256"}"
  2. base64_data: 原始数据解base64即为明文
  3. hash: 签名值

crypto解密

算法主要为三种,解密依次尝试直到成功

  1. AES ECB
  2. AES GCM
  3. SM4 ECB

总结如下:

  1. 如果是signer格式密文,则可直接解密
  2. 如果是crypto格式密文,则需要SECRET_KEY

解密脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import json
import base64
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad
from Cryptodome.Random import get_random_bytes
from gmssl.sm4 import CryptSM4, SM4_ENCRYPT, SM4_DECRYPT


def signer_decode(val: str):
data = val.split(".")[1]
data = data + "=" * (-len(data) % 4)
return json.loads(base64.b64decode(data))


def process_key(key):
"""
返回32 bytes 的key
"""
if not isinstance(key, bytes):
key = bytes(key, encoding='utf-8')

if len(key) >= 32:
return key[:32]

return pad(key, 32)


class BaseCrypto:

def encrypt(self, text):
return base64.urlsafe_b64encode(
self._encrypt(bytes(text, encoding='utf8'))).decode('utf8')

def _encrypt(self, data: bytes) -> bytes:
raise NotImplementedError

def decrypt(self, text):
return self._decrypt(
base64.urlsafe_b64decode(bytes(text,
encoding='utf8'))).decode('utf8')

def _decrypt(self, data: bytes) -> bytes:
raise NotImplementedError


class GMSM4EcbCrypto(BaseCrypto):

def __init__(self, key):
self.key = process_key(key)
self.sm4_encryptor = CryptSM4()
self.sm4_encryptor.set_key(self.key, SM4_ENCRYPT)

self.sm4_decryptor = CryptSM4()
self.sm4_decryptor.set_key(self.key, SM4_DECRYPT)

def _encrypt(self, data: bytes) -> bytes:
return self.sm4_encryptor.crypt_ecb(data)

def _decrypt(self, data: bytes) -> bytes:
return self.sm4_decryptor.crypt_ecb(data)


class AESCrypto:
"""
AES
除了MODE_SIV模式key长度为:32, 48, or 64,
其余key长度为16, 24 or 32
详细见AES内部文档
CBC模式传入iv参数
本例使用常用的ECB模式
"""

def __init__(self, key):
if len(key) > 32:
key = key[:32]
self.key = self.to_16(key)

@staticmethod
def to_16(key):
"""
转为16倍数的bytes数据
:param key:
:return:
"""
key = bytes(key, encoding="utf8")
while len(key) % 16 != 0:
key += b'\0'
return key # 返回bytes

def aes(self):
return AES.new(self.key, AES.MODE_ECB) # 初始化加密器

def encrypt(self, text):
aes = self.aes()
return str(base64.encodebytes(aes.encrypt(self.to_16(text))),
encoding='utf8').replace('\n', '') # 加密

def decrypt(self, text):
aes = self.aes()
return str(
aes.decrypt(base64.decodebytes(bytes(
text, encoding='utf8'))).rstrip(b'\0').decode("utf8")) # 解密


class AESCryptoGCM:
"""
使用AES GCM模式
"""

def __init__(self, key):
self.key = process_key(key)

def encrypt(self, text):
"""
加密text,并将 header, nonce, tag (3*16 bytes, base64后变为 3*24 bytes)
附在密文前。解密时要用到。
"""
header = get_random_bytes(16)
cipher = AES.new(self.key, AES.MODE_GCM)
cipher.update(header)
ciphertext, tag = cipher.encrypt_and_digest(
bytes(text, encoding='utf-8'))

result = []
for byte_data in (header, cipher.nonce, tag, ciphertext):
result.append(base64.b64encode(byte_data).decode('utf-8'))

return ''.join(result)

def decrypt(self, text):
"""
提取header, nonce, tag并解密text。
"""
metadata = text[:72]
header = base64.b64decode(metadata[:24])
nonce = base64.b64decode(metadata[24:48])
tag = base64.b64decode(metadata[48:])
ciphertext = base64.b64decode(text[72:])

cipher = AES.new(self.key, AES.MODE_GCM, nonce=nonce)

cipher.update(header)
plain_text_bytes = cipher.decrypt_and_verify(ciphertext, tag)
return plain_text_bytes.decode('utf-8')


def get_aes_crypto(key, mode='GCM'):
if mode == 'ECB':
a = AESCrypto(key)
elif mode == 'GCM':
a = AESCryptoGCM(key)
return a


def get_gm_sm4_ecb_crypto(key):
return GMSM4EcbCrypto(key)


class Crypto:

def __init__(self, cryptoes):
self.cryptoes = cryptoes

@property
def encryptor(self):
return self.cryptoes[0]

def encrypt(self, text):
return self.encryptor.encrypt(text)

def decrypt(self, text):
for decryptor in self.cryptoes:
try:
origin_text = decryptor.decrypt(text)
if origin_text:
# 有时不同算法解密不报错,但是返回空字符串
return origin_text
except (TypeError, ValueError, UnicodeDecodeError, IndexError):
continue


if __name__ == "__main__":
# SECRET_KEY
key = 'jM4YjFmYTE5OWRjODcyYTdkZWE2NjMxOGNhN2EyYWFmOTWE2N'
# private_key 或 password 在数据库里的值
data = [
'eyJhbGciOiJIUzI1NiJ9.InBhc3N3b3JkIg.ME3PUOsChmTHlWimu8W1K6pTUgm9fKPnuzyQKglA7ww',
'Dz6x4Uf+fUT7S/djyxv82w=='
]
crypto = Crypto([
get_aes_crypto(key, mode='ECB'),
get_aes_crypto(key, mode='GCM'),
get_gm_sm4_ecb_crypto(key),
])
for d in data:
if d.count(".") == 2:
print('by signer:', signer_decode(d))
else:
print('by crypto:', crypto.decrypt(d))