Skip to content

Commit 94957ac

Browse files
committed
chore: update link for ics-mem-collect
mirror now that github is archived
1 parent 30a08f9 commit 94957ac

File tree

5 files changed

+438
-1
lines changed

5 files changed

+438
-1
lines changed

tools/analysis/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Developed as a community asset
1212
- [Firmadyne](https://github.com/firmadyne/firmadyne) - System for emulation and dynamic analysis of Linux-based firmware
1313
- [ICSREF](https://github.com/momalab/ICSREF) - ICSREF is a modular framework that automates the reverse engineering process of CODESYS binaries compiled with the CODESYS v2 compiler.
1414
- [Thomas Roth - Embedded Serial Gateways](https://github.com/nezza/scada-stuff) - Various scripts for analysis of some serial embedded gateways. See also: [video](https://media.ccc.de/v/34c3-8956-scada_-_gateway_to_s_hell)
15-
- [ICS Mem Collect](https://github.com/fireeye/ics_mem_collect) - Memory Analysis Collection Toolkit for GE D20MX platform from FireEye.
15+
- [ICS Mem Collect](https://github.com/mandiant/ics_mem_collect) - Memory Analysis Collection Toolkit for GE D20MX platform from FireEye. Now archived, so mirrored [here](/tools/mirrored/ics-mem-collect)
1616
- [Recon 2017 Work on Protection Relays](https://github.com/rigmar/Recon2017) - More information [here](http://www.scada.sl/2017/10/hopeless-relay-protection-for.html)
1717
- [EMBA](https://github.com/e-m-b-a/emba) - The security analyzer for embedded device firmware + [EMBArk](https://github.com/e-m-b-a/embark) - the web based enterprise interface for EMBA
1818

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# ics_mem_collect
2+
3+
For many industrial control system devices, there is not a simple solution for programmatically accessing memory. Without an API, an incident responder or digital forensics analyst may be required to manually probe memory looking for anomalies or malicious activity. This project is intended to develop APIs that allow an analyst to adapt pre-existing tools or rapidly build new tools in order to target these devices.
4+
5+
Current Devices:
6+
* GE D20MX
7+
8+
Future Work:
9+
* JTAG Interface
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
import binascii
2+
import collections
3+
import serial
4+
import socket
5+
import struct
6+
import time
7+
import re
8+
9+
page_size = 0x1000
10+
MEM_SIZE = 0x40000000
11+
12+
13+
def page_align(addr):
14+
return addr & 0xFFFFF000
15+
16+
re_prompt = re.compile("^D20M>", re.MULTILINE)
17+
18+
class fd_d20mx_cache(object):
19+
def __init__(self):
20+
self.f_cache = open("d20mx.cache", "r+b", 0)
21+
self.f_cache.seek(0)
22+
self.f_cache.truncate(MEM_SIZE)
23+
self.f_status = open("d20mx.cache.status", "r+b", 0)
24+
self.f_status.seek(0)
25+
self.f_status.truncate(MEM_SIZE / page_size)
26+
27+
def _page_num(self, addr):
28+
return page_align(addr) / page_size
29+
30+
def set_page_cached(self, addr, status):
31+
if page_align(addr) != addr:
32+
print "page unaligned addr"
33+
return False
34+
if status not in [True, False]:
35+
print "status error, must be true or false"
36+
return False
37+
else:
38+
# print "writing status for %08x to %s" % (addr, status)
39+
self.f_status.seek(self._page_num(addr))
40+
b = struct.pack("B", 1 if status == True else 0)
41+
self.f_status.write(b)
42+
self.f_status.flush()
43+
return True
44+
45+
def is_page_cached(self, addr):
46+
self.f_status.seek(self._page_num(addr))
47+
# print "Checking page %08x located at %08x" % (self.f_status.tell(), addr)
48+
data = self.f_status.read(1)
49+
status = True if data == '\x01' else False
50+
return status
51+
52+
def cache_page(self, addr, bytez):
53+
if len(bytez) != page_size:
54+
print "Not enough bytez, expected 0x%08x got 0x%08x" % (page_size, len(bytez))
55+
return
56+
if page_align(addr) != addr:
57+
print "addr was not page aligned"
58+
return
59+
tell = self.tell()
60+
self.seek(addr)
61+
self.f_cache.write(bytez)
62+
self.seek(tell)
63+
self.set_page_cached(addr, True)
64+
return True
65+
66+
def read(self, n=-1):
67+
if self.is_page_cached(page_align(self.f_cache.tell())):
68+
return self.f_cache.read(n)
69+
else:
70+
print "ATTEMPT TO READ UNPAGED MEM"
71+
72+
def write(self, b):
73+
self.f_cache.write(b)
74+
75+
def seek(self, offset, whence=0):
76+
return self.f_cache.seek(offset, whence)
77+
78+
def tell(self):
79+
return self.f_cache.tell()
80+
81+
82+
class fd_d20mx_transport(object):
83+
def __init__(self):
84+
self.mem = fd_d20mx_cache()
85+
86+
def seek(self, offset, whence=0):
87+
self.mem.seek(offset, whence)
88+
89+
def tell(self):
90+
return self.mem.tell()
91+
92+
def read(self, length):
93+
pages = [x for x in range(page_align(self.tell()), self.tell() + length, page_size)]
94+
for page in pages:
95+
if self.mem.is_page_cached(page):
96+
pass
97+
#print "[!] [%08x] PAGE CACHED" % page_align(self.tell())
98+
else:
99+
#print "[X] [%08x} PAGE NOT CACHED" % page_align(self.tell())
100+
self._cache_page(page)
101+
data = self.mem.read(length)
102+
return data
103+
def write(self, data):
104+
self.mem.write(data)
105+
106+
class fd_d20mx_serial(fd_d20mx_transport):
107+
def __init__(self, **parameters):
108+
super(fd_d20mx_serial, self).__init__()
109+
parameters['port'] = None # "/dev/ttyUSB0"
110+
parameters['baudrate'] = 115200
111+
parameters['xonxoff'] = True
112+
113+
# Initalize the serial device
114+
self.ser = serial.Serial(**parameters)
115+
self.ser.write("\r\n")
116+
time.sleep(1)
117+
prompt = self.ser.read(self.ser.in_waiting)
118+
print prompt
119+
if "D20M>" not in prompt:
120+
raise Exception("Invalid prompt")
121+
# Create cache
122+
123+
def _cache_page(self, addr):
124+
self.ser.write("d %08x %08x\r\n" % (page_align(self.mem.tell()), page_align(self.mem.tell()) + page_size))
125+
self.ser.read_until("\r\n")
126+
udata = self.ser.read_until("D20M>")
127+
bdata = ''
128+
print udata
129+
if "data access" in udata:
130+
print "GUARD PAGE"
131+
else:
132+
for line in udata.splitlines():
133+
if line.startswith("value"): continue
134+
if line.startswith("D20M>"): continue
135+
if line == "": continue
136+
bdata += line[9:9 + 2 + 3 * 16].replace(" ", "")
137+
bytez = binascii.unhexlify(bdata)
138+
self.mem.cache_page(addr, bytez)
139+
# print "CACHING PAGE"
140+
141+
142+
class fd_d20mx_tcp(fd_d20mx_transport):
143+
def __init__(self, s=None):
144+
super(fd_d20mx_tcp, self).__init__()
145+
self.s = s
146+
if not s:
147+
self.s = socket.socket()
148+
self.s.settimeout(60)
149+
if not s:
150+
self.s.connect(("127.0.0.1", 4444))
151+
print "Connected to loopback:4444"
152+
self.s.send("\r\n")
153+
time.sleep(1)
154+
data = ''
155+
for i in range(10):
156+
self.s.send("\r\n")
157+
try:
158+
data += self.s.recv(1000)
159+
except socket.timeout as e:
160+
print e.message
161+
if "D20M>" in data:
162+
break
163+
else:
164+
raise Exception("Invalid prompt")
165+
166+
self.mem = fd_d20mx_cache()
167+
168+
def _cache_page(self, addr):
169+
print "[X] [%08x] Caching page" % page_align(addr)
170+
#print "d %08x %08x\r\n" % (page_align(self.mem.tell()), page_align(self.mem.tell()) + page_size)
171+
self.s.send("d %08x %08x" % (page_align(self.mem.tell()), page_align(self.mem.tell()) + page_size))
172+
time.sleep(1)
173+
self.s.recv(1000)
174+
self.s.send("\r\n")
175+
176+
udata = ''
177+
#udata = self.s.recv(1000)
178+
#udata = udata[udata.index("\r\n"):]
179+
while True:
180+
udata += self.s.recv(1000)
181+
if re_prompt.search(udata):
182+
#udata = udata[:udata.index("D20M>")]
183+
break
184+
bdata = ''
185+
if "data access" in udata:
186+
print "GUARD PAGE"
187+
else:
188+
for line in udata.splitlines():
189+
if line.startswith("value"): continue
190+
if line.startswith("d "): continue
191+
if line.startswith("D20M>"): continue
192+
if line == "": continue
193+
if len(bdata) % 2: break
194+
match = re.match("[0-9A-F]{8} (([0-9a-f]{2} ( )?){16})", line)
195+
if match:
196+
print "GROUP", match.group(1)
197+
bdata += line[9:9 + 2 + 3 * 16].replace(" ", "")
198+
bytez = binascii.unhexlify(bdata)
199+
self.mem.cache_page(addr, bytez)
200+
def _write(self, data):
201+
print "[X] [%08x] Edit mem" % addr
202+
#print "d %08x %08x\r\n" % (page_align(self.mem.tell()), page_align(self.mem.tell()) + page_size)
203+
#self.s.send("eds\r\n")
204+
#self.s.send("c\r\n")
205+
#self.s.recv(1000)
206+
#self.s.send("f %08x,1" % (page_align(self.mem.tell()), page_align(self.mem.tell()) + page_size))
207+
#self.s.send("\r\n")
208+
#send_data = '\r\n'.join(["%X" % x for x in data])
209+
#print send_data
210+
#print "."
211+
#self.s.send(".")
212+
#self.s.send("quit")
213+
#self.s.send("3")
214+
#try:
215+
# while True:
216+
# self.s.recv(1000)
217+
#except:
218+
# print "recvd all"
219+
# pass
220+
#
221+
#self.s.send("\r\n")
222+
#rdata = self.s.recv(1000)
223+
#groups = prompt.match(rdata)
224+
#if groups is None:
225+
# raise Exception("Bad prompt returning from write")
226+
self.mem.write(data)
227+
228+
229+

0 commit comments

Comments
 (0)