|
| 1 | +# Copyright lowRISC contributors (OpenTitan project). |
| 2 | +# Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| 3 | +# SPDX-License-Identifier: Apache-2.0 |
| 4 | + |
| 5 | +from sw.host.penetrationtests.python.fi.communication.fi_asym_cryptolib_commands import ( |
| 6 | + OTFIAsymCrypto, |
| 7 | +) |
| 8 | +from python.runfiles import Runfiles |
| 9 | +from sw.host.penetrationtests.python.util import targets |
| 10 | +from sw.host.penetrationtests.python.util import common_library |
| 11 | +from sw.host.penetrationtests.python.util.gdb_controller import GDBController |
| 12 | +import json |
| 13 | +import argparse |
| 14 | +import sys |
| 15 | +import os |
| 16 | +import time |
| 17 | +from Crypto.PublicKey import ECC |
| 18 | +from Crypto.Signature import DSS |
| 19 | +from Crypto.Hash import SHA384 |
| 20 | + |
| 21 | +ignored_keys_set = set(["status"]) |
| 22 | +opentitantool_path = "" |
| 23 | + |
| 24 | +target = None |
| 25 | +asymfi = None |
| 26 | + |
| 27 | +# Read in the extra arguments from the opentitan_test. |
| 28 | +parser = argparse.ArgumentParser() |
| 29 | +parser.add_argument("--bitstream", type=str) |
| 30 | +parser.add_argument("--bootstrap", type=str) |
| 31 | + |
| 32 | +args, config_args = parser.parse_known_args() |
| 33 | + |
| 34 | +BITSTREAM = args.bitstream |
| 35 | +BOOTSTRAP = args.bootstrap |
| 36 | + |
| 37 | + |
| 38 | +# Preparing the input for an invalid signature |
| 39 | +key = ECC.generate(curve="P-384") |
| 40 | +pubx = [x for x in key.pointQ.x.to_bytes(48, "little")] |
| 41 | +puby = [x for x in key.pointQ.y.to_bytes(48, "little")] |
| 42 | +message = [i for i in range(16)] |
| 43 | +h = SHA384.new(bytes(message)) |
| 44 | +signer = DSS.new(key, "fips-186-3") |
| 45 | +signature = [x for x in signer.sign(h)] |
| 46 | +# Corrupt the signature for FiSim Testing |
| 47 | +signature[0] ^= 0x1 |
| 48 | +r_bytes = signature[:48] |
| 49 | +s_bytes = signature[48:] |
| 50 | +r_bytes.reverse() |
| 51 | +s_bytes.reverse() |
| 52 | +cfg = 0 |
| 53 | +trigger = 1 |
| 54 | +h = SHA384.new(bytes(message)) |
| 55 | +message_digest = [x for x in h.digest()] |
| 56 | + |
| 57 | + |
| 58 | +def trigger_testos_init(print_output=True): |
| 59 | + # Initializing the testOS (setting up the alerts and accelerators) |
| 60 | + (device_id, _, _, _, _, _, _) = asymfi.init( |
| 61 | + alert_config=common_library.no_escalation_alert_config |
| 62 | + ) |
| 63 | + if print_output: |
| 64 | + print("Output from init ", device_id) |
| 65 | + |
| 66 | + |
| 67 | +def trigger_p384_verify(): |
| 68 | + asymfi.handle_p384_verify(pubx, puby, r_bytes, s_bytes, message_digest, cfg, trigger) |
| 69 | + |
| 70 | + |
| 71 | +def read_testos_output(): |
| 72 | + # Read the output from the operation |
| 73 | + response = target.read_response(max_tries=500) |
| 74 | + return response |
| 75 | + |
| 76 | + |
| 77 | +if __name__ == "__main__": |
| 78 | + r = Runfiles.Create() |
| 79 | + # Get the openocd path. |
| 80 | + openocd_path = r.Rlocation("lowrisc_opentitan/third_party/openocd/build_openocd/bin/openocd") |
| 81 | + # Get the openocd config files. |
| 82 | + # The first file is on the cw340 (this is specific to the cw340) |
| 83 | + CONFIG_FILE_CHIP = r.Rlocation("lowrisc_opentitan/util/openocd/board/cw340_ftdi.cfg") |
| 84 | + # The config for the earlgrey design |
| 85 | + CONFIG_FILE_DESIGN = r.Rlocation("lowrisc_opentitan/util/openocd/target/lowrisc-earlgrey.cfg") |
| 86 | + # Get the opentitantool path. |
| 87 | + opentitantool_path = r.Rlocation("lowrisc_opentitan/sw/host/opentitantool/opentitantool") |
| 88 | + # The path for GDB and the default port (set up by OpenOCD) |
| 89 | + GDB_PATH = r.Rlocation("lowrisc_rv32imcb_toolchain/bin/riscv32-unknown-elf-gdb") |
| 90 | + GDB_PORT = 3333 |
| 91 | + # Directory for the trace log files |
| 92 | + log_dir = os.environ.get("TEST_UNDECLARED_OUTPUTS_DIR") |
| 93 | + pc_trace_file = os.path.join(log_dir, "pc_trace.log") |
| 94 | + # Directory for the output results |
| 95 | + test_results_file = os.path.join(log_dir, "test_results.log") |
| 96 | + # Program the bitstream for FPGAs. |
| 97 | + bitstream_path = None |
| 98 | + if BITSTREAM: |
| 99 | + bitstream_path = r.Rlocation("lowrisc_opentitan/" + BITSTREAM) |
| 100 | + # Get the firmware path. |
| 101 | + firmware_path = r.Rlocation("lowrisc_opentitan/" + BOOTSTRAP) |
| 102 | + # Get the disassembly path. |
| 103 | + dis_path = firmware_path.replace(".img", ".dis") |
| 104 | + # And the path for the elf. |
| 105 | + elf_path = firmware_path.replace(".img", ".elf") |
| 106 | + |
| 107 | + if "fpga" in BOOTSTRAP: |
| 108 | + target_type = "fpga" |
| 109 | + else: |
| 110 | + target_type = "chip" |
| 111 | + |
| 112 | + target_cfg = targets.TargetConfig( |
| 113 | + target_type=target_type, |
| 114 | + interface_type="hyperdebug", |
| 115 | + fw_bin=firmware_path, |
| 116 | + opentitantool=opentitantool_path, |
| 117 | + bitstream=bitstream_path, |
| 118 | + tool_args=config_args, |
| 119 | + openocd=openocd_path, |
| 120 | + openocd_chip_config=CONFIG_FILE_CHIP, |
| 121 | + openocd_design_config=CONFIG_FILE_DESIGN, |
| 122 | + ) |
| 123 | + |
| 124 | + target = targets.Target(target_cfg) |
| 125 | + asymfi = OTFIAsymCrypto(target) |
| 126 | + successful_faults = 0 |
| 127 | + total_attacks = 0 |
| 128 | + |
| 129 | + # How to read outputs in this script: |
| 130 | + # To view the UART output from the testOS or the chip in general, use: |
| 131 | + # target.print_all() or print(read_testos_output()) |
| 132 | + # In order to print the OpenOCD output use print(target.read_openocd()) |
| 133 | + # In order to print the output from GDB use print(gdb.read_output()) or |
| 134 | + # when you want to know the output from a gdb.send_command() print it: |
| 135 | + # print(gdb.send_command()) |
| 136 | + |
| 137 | + # What to do when running into errors: |
| 138 | + # - If device is busy or seeing "rejected 'gdb' connection, no more connections allowed", |
| 139 | + # cut the USB connection, e.g., sudo fuser /dev/ttyUSB0 and kill the PID |
| 140 | + # - If the port is busy check sudo lsof -i :3333 and then kill the PID |
| 141 | + |
| 142 | + try: |
| 143 | + # Program the bitstream, flash the target, and set up OpenOCD |
| 144 | + target.initialize_target() |
| 145 | + |
| 146 | + # Initialize the testOS |
| 147 | + trigger_testos_init() |
| 148 | + |
| 149 | + # Connect to GDB |
| 150 | + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) |
| 151 | + |
| 152 | + # Provide the function name and extract the start and end address from the dis file |
| 153 | + function_name = "p384_scalar_write" |
| 154 | + # otcrypto_ecdsa_p384_verify (second one) |
| 155 | + # OUTLINED_FUNCTION_22 (second one) |
| 156 | + # hardened_memeq (not working yet) |
| 157 | + # p384_scalar_write (any one) |
| 158 | + # Gives back an array of hits where the function is called |
| 159 | + trace_addresses = gdb.get_function_addresses(dis_path, function_name) |
| 160 | + print("Start and stop addresses of ", function_name, ": ", trace_addresses) |
| 161 | + |
| 162 | + p384_observation_addresses = gdb.get_function_addresses( |
| 163 | + dis_path, "otcrypto_ecdsa_p384_verify" |
| 164 | + ) |
| 165 | + crash_observation_address = gdb.get_function_start_address( |
| 166 | + dis_path, "ottf_exception_handler" |
| 167 | + ) |
| 168 | + |
| 169 | + # Start the tracing |
| 170 | + # We set a short timeout to detect whether GDB has connected properly |
| 171 | + # and a long timeout for the entire tracing |
| 172 | + initial_timeout = 2 |
| 173 | + total_timeout = 60 * 30 |
| 174 | + |
| 175 | + # We pick the second address hit |
| 176 | + # TODO: Find a way to pick the correct one |
| 177 | + gdb.setup_pc_trace(pc_trace_file, trace_addresses[1][0], trace_addresses[1][1]) |
| 178 | + gdb.send_command("c", check_response=False) |
| 179 | + |
| 180 | + # Trigger the p384 verify from the testOS (we do not read its output) |
| 181 | + trigger_p384_verify() |
| 182 | + |
| 183 | + start_time = time.time() |
| 184 | + initial_timeout_stopped = False |
| 185 | + total_timeout_stopped = False |
| 186 | + |
| 187 | + # Run the tracing to get the trace log |
| 188 | + # Sometimes the tracing fails due to race conditions, |
| 189 | + # we have a quick initial timeout to catch this |
| 190 | + while time.time() - start_time < initial_timeout: |
| 191 | + output = gdb.read_output() |
| 192 | + if "Breakpoint 1, " in output: |
| 193 | + initial_timeout_stopped = True |
| 194 | + break |
| 195 | + if not initial_timeout_stopped: |
| 196 | + print("No initial break point found, can be a misfire, try again") |
| 197 | + sys.exit(1) |
| 198 | + while time.time() - start_time < total_timeout: |
| 199 | + output = gdb.read_output() |
| 200 | + if "PC trace complete" in output: |
| 201 | + print("\nTrace complete") |
| 202 | + total_timeout_stopped = True |
| 203 | + break |
| 204 | + if not total_timeout_stopped: |
| 205 | + print("Final tracing timeout reached") |
| 206 | + sys.exit(1) |
| 207 | + |
| 208 | + # Parse and truncate the trace log to get all PCs in a list |
| 209 | + pc_list = gdb.parse_pc_trace_file(pc_trace_file) |
| 210 | + # Get the unique PCs out |
| 211 | + pc_list = list(set(pc_list)) |
| 212 | + |
| 213 | + # TODO: Make this truncate for wait cycles and identify the instance count ... |
| 214 | + print("Tracing has a total of", len(pc_list), "PCs", flush=True) |
| 215 | + |
| 216 | + if len(pc_list) <= 0: |
| 217 | + print("Found no tracing, stopping") |
| 218 | + sys.exit(1) |
| 219 | + |
| 220 | + # Reset the target, flush the output, and close gdb |
| 221 | + gdb.reset_target() |
| 222 | + target.dump_all() |
| 223 | + trigger_testos_init(print_output=False) |
| 224 | + gdb.close_gdb() |
| 225 | + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) |
| 226 | + |
| 227 | + # Open the results file |
| 228 | + test_results = open(test_results_file, "w") |
| 229 | + |
| 230 | + idx = 0 |
| 231 | + while idx < len(pc_list): |
| 232 | + print("-" * 80) |
| 233 | + print("Applying instruction skip in ", pc_list[idx]) |
| 234 | + print("-" * 80) |
| 235 | + |
| 236 | + prefix_observation = "fisim_result:" |
| 237 | + function_output_observation = "function output detected" |
| 238 | + crash_observation = "crash detected" |
| 239 | + |
| 240 | + try: |
| 241 | + # TODO Adding the observation points creates issues in the flow of the instructions |
| 242 | + # The observation points |
| 243 | + # observations = { |
| 244 | + # # Function output |
| 245 | + # p384_observation_addresses[1][1]: f"{function_output_observation}", |
| 246 | + # # Crash check |
| 247 | + # crash_observation_address: f"{crash_observation}", |
| 248 | + # } |
| 249 | + # gdb.add_observation(observations) |
| 250 | + |
| 251 | + # TODO Handle multiple same addresses |
| 252 | + gdb.apply_instruction_skip( |
| 253 | + pc_list[idx], gdb.parse_next_instruction(pc_list[idx], dis_path) |
| 254 | + ) |
| 255 | + gdb.send_command("c", check_response=False) |
| 256 | + |
| 257 | + # The instruction skip loop |
| 258 | + trigger_p384_verify() |
| 259 | + time.sleep(0.02) |
| 260 | + testos_response = read_testos_output() |
| 261 | + |
| 262 | + gdb_response = gdb.read_output() |
| 263 | + if ( |
| 264 | + 'stopped,reason="breakpoint-hit"' in gdb_response |
| 265 | + and "instruction skip applied" in gdb_response |
| 266 | + ): |
| 267 | + idx += 1 |
| 268 | + total_attacks += 1 |
| 269 | + testos_response_json = json.loads(testos_response) |
| 270 | + print("Output:", testos_response_json, flush=True) |
| 271 | + verification_result = testos_response_json["result"] |
| 272 | + if verification_result: |
| 273 | + successful_faults += 1 |
| 274 | + print("-" * 80) |
| 275 | + print("Successful FI attack!") |
| 276 | + print("Location:", pc_list[idx - 1]) |
| 277 | + print(gdb_response) |
| 278 | + print("Response:", testos_response_json) |
| 279 | + print("-" * 80) |
| 280 | + test_results.write(f"{pc_list[idx - 1]}: {testos_response_json}\n") |
| 281 | + |
| 282 | + # Reset GDB by closing and opening again |
| 283 | + gdb.close_gdb() |
| 284 | + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) |
| 285 | + # We do not need to reset the target since it returned an output |
| 286 | + else: |
| 287 | + print("No break point found, something went wrong", flush=True) |
| 288 | + gdb.close_gdb() |
| 289 | + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) |
| 290 | + gdb.reset_target() |
| 291 | + target.dump_all() |
| 292 | + trigger_testos_init(print_output=False) |
| 293 | + # Reset again |
| 294 | + gdb.close_gdb() |
| 295 | + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) |
| 296 | + |
| 297 | + except json.JSONDecodeError: |
| 298 | + print("Error: JSON decoding failed. Invalid response format", flush=True) |
| 299 | + gdb.close_gdb() |
| 300 | + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) |
| 301 | + gdb.reset_target() |
| 302 | + target.dump_all() |
| 303 | + trigger_testos_init(print_output=False) |
| 304 | + # Reset again |
| 305 | + gdb.close_gdb() |
| 306 | + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) |
| 307 | + |
| 308 | + except TimeoutError: |
| 309 | + print("Timeout error, retrying", flush=True) |
| 310 | + gdb.close_gdb() |
| 311 | + target.close_openocd() |
| 312 | + target.initialize_target() |
| 313 | + trigger_testos_init() |
| 314 | + target.dump_all() |
| 315 | + gdb = GDBController(gdb_path=GDB_PATH, gdb_port=GDB_PORT, elf_file=elf_path) |
| 316 | + |
| 317 | + finally: |
| 318 | + print("-" * 80) |
| 319 | + print("Trace data is logged in ", pc_trace_file) |
| 320 | + print("Instruction skip results are logged in ", test_results_file) |
| 321 | + print(f"Total attacks {total_attacks}, successful attacks {successful_faults}") |
| 322 | + print("You can find the dissassembly in ", dis_path) |
| 323 | + # Close the OpenOCD and GDB connection at the end |
| 324 | + target.close_openocd() |
0 commit comments