-
Notifications
You must be signed in to change notification settings - Fork 0
/
gen_os
344 lines (289 loc) · 13.7 KB
/
gen_os
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/env python
# This script implements the final step in an image build process, where,
# depending on the target and user's options, the image is signed and stitched.
#
# A QUICK NOTE FOR DEVELOPERS
#
# This script has been written using design choices that would make the
# development of a new signing method quick and easy. To create a new signing
# handler:
#
# * Create a class that derives from the NoneHandler class, and implement your
# flavor of the sign() method.
#
# * Decorate the class with @Stitch.register(handler_name), where handler_name
# is the name of your handler. This automatically adds your handler to the
# list of handlers available for signing on the command-line.
#
# * If you need to call an external program for signing, this program should
# supports streaming, e.g. the program shall be able to read from stdin and
# write to stdout. This makes it easy to communicate with the program through
# ssh. If the program doesn't support streaming, you can create a wrapper
# around the program that supports streaming. For an example, see isu_stream
# in device/intel/PRIVATE/intel_signing_utility.
import os
import sys
import logging
import argparse
import binascii
import hashlib
import tempfile
import shutil
import subprocess
import xml.etree.ElementTree as ElementTree
import re
DEBUG = False # Set to False for some speedups
def logging_debug(*args):
# NOTE: If you want to see debug messages, set the logging level manually to
# logging.DEBUG - 1. See below.
if DEBUG and logging.getLogger().level < logging.DEBUG:
logging.debug(*args)
# Find out where prebuilt executables are on the host machine. We really only
# care about Linux, which 'isu' and 'xfstk-stitcher' are compiled for.
system, _, _, _, machine = os.uname()
if 'Linux' in system and '86' in machine:
HOST_OUT_EXECUTABLES = os.path.join('out', 'host', 'linux-x86', 'bin')
else:
raise SystemError("Unsupported platform: %s %s" % (system, machine))
# Check whether an argparse argument is a file.
def file_pathname(arg):
if not os.path.isfile(arg):
raise argparse.ArgumentTypeError("%s doesn't exist or is not a file" % (arg))
return arg
# Run a command and return its exit code, output, and errput.
def _run(cmd, cwd=None, input=None):
logging_debug("Spawning child process: %r", cmd)
logging_debug(" cwd=%s", cwd)
p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring), cwd=cwd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = p.communicate(input)
logging_debug(" exit code=%s", p.returncode)
return p.returncode, out, err
# Check whether an environment variable is set...
def _env(varname):
try:
value = os.environ[varname].lower()
return value in ["1", "y", "yes", "true"]
except:
return False
# Bring the signing environment to handlers that support signing.
_SIGNATURE_BY_SIGNER = _env('USE_SIGNER')
_SIGNATURE_REQUESTED = _env('LOCAL_SIGN') or _SIGNATURE_BY_SIGNER
try:
_SIGNER = "%s@%s" % (os.environ['SIGNER_LOGIN'], os.environ['SIGNER_SERVER'])
except:
_SIGNER = None
# The Stitch class keeps track of supported handlers. Under the hood, the
# developer just needs to decorate his handler class and the Stitch class will
# make it available on the command-line.
class Stitch(object):
_handlers = {}
# Registers a handler class using the decorator pattern. The handler name
# shall be unique, but don't worry if it's not, you'll be warned early
# enough!
@classmethod
def register(self, name):
assert name not in self._handlers, "Handler already exists: %s" % name
def decorator(klass):
self._handlers[name] = klass
return klass
return decorator
# Return the list of registered handlers. This method is used along with
# ArgParser.add_argument() to set the choices of the --sign-with option.
@classmethod
def choices(self):
return self._handlers.keys()
# Return a registered handler class by its name.
@classmethod
def get(self, name):
try:
return self._handlers[name]
except:
raise ValueError("Unknown handler: %s" % name)
@Stitch.register('none')
class NoneHandler(object):
CHUNK_SIZE = 8192 # Size of blocks sent to sha.update().
ALIGN_SIZE = 4 # Byte boundary for alignment.
STITCH_HOME = os.path.join('./')
STITCH_SHARE = os.path.join(STITCH_HOME)
STITCH_CONFIG = os.path.join('./', 'ConfigFile_os.txt')
STITCH_PROG = os.path.join('./', 'xfstk-stitcher')
def __init__(self, image_file, platform_definition, platform_type=None):
self.image_files = [image_file]
self.is_signed = False
self.platform_definition = platform_definition
self.platform_type = platform_type
self.stitch_config_file = None
self.stitch_platform_file = None
def __enter__(self):
self.workdir = tempfile.mkdtemp()
# Create the configuration file and the definition file for the
# stitcher.
self.stitch_config_file = os.path.basename(self.STITCH_CONFIG)
self.stitch_config_file = os.path.join(self.workdir, self.stitch_config_file)
shutil.copy(self.STITCH_CONFIG, self.stitch_config_file)
xml_template_file = file_pathname(os.path.join(self.STITCH_SHARE, self.platform_definition))
self.stitch_platform_file = os.path.join(self.workdir, self.platform_definition)
shutil.copy(xml_template_file, self.stitch_platform_file)
return self
def __exit__(self, type, value, traceback):
shutil.rmtree(self.workdir)
def getsize(self):
return os.path.getsize(self.image_files[0])
def gethash(self):
# Get the number of padding bytes required to align the image. We shall
# do this prior to computing the image digest.
padding_len = self.ALIGN_SIZE - (self.getsize() % self.ALIGN_SIZE)
padding_len = padding_len % self.ALIGN_SIZE
if padding_len:
logging_debug("Adding %d padding bytes to align image", padding_len)
padding = binascii.unhexlify('ff' * padding_len)
with open(self.image_files[0], 'ab') as f:
f.write(padding)
sha = hashlib.sha256()
with open(self.image_files[0], 'rb') as f:
for chunk in iter(lambda: f.read(self.CHUNK_SIZE), b''):
sha.update(chunk)
digest = sha.hexdigest()
return digest
def set_stitch_info(self, **kwargs):
# The stitcher definition file shall be updated with the source image
# filename, i.e. the image file used by the stitcher to build the final
# image. If the image is signed, the first bit of image_attributes
# shall be reset as well.
logging_debug("Updating stitching information...")
elem_tree = ElementTree.parse(self.stitch_platform_file)
tree_root = elem_tree.getroot()
if self.is_signed:
for elem in tree_root.findall('./osip_header/os_image/image_attributes'):
elem.text = str(int(elem.text) & (~1))
for elem in tree_root.findall('./osip_header/os_image/image_filepath'):
elem.text = self.image_files[self.is_signed]
with open(self.stitch_platform_file, 'w') as f:
elem_tree.write(f)
with open(self.stitch_config_file, 'r') as f:
config = f.read()
for k, v in kwargs.items():
if v is not None:
logging_debug(" %s = %s", k, v)
patt = r'(%s\s*=).*' % k
repl = r'\1 %s' % v
config = re.sub(patt, repl, config)
with open(self.stitch_config_file, 'w') as f:
f.write(config)
def stitch(self, output_file):
# We are going to change the current working directory, the path to
# output_file may be invalid in the new location, so absolutize it.
output_file = os.path.abspath(output_file)
self.set_stitch_info(ImageName=output_file,
PlatformType=self.platform_type,
# CAUTION: minus 'p' for platformXML
platformXML=self.stitch_platform_file)
logging_debug("Stitching image %s...", self.image_files[self.is_signed])
stitch_cmd = [self.STITCH_PROG,
'-k', self.stitch_platform_file,
'-c', self.stitch_config_file]
retcode, out, err = _run(stitch_cmd)
logging.info(out)
if retcode:
raise RuntimeError(err)
# XXX: THIS HANDLER SHALL BE REMOVED ONCE XFSTK-STITCHER SUPPORTS platformXML
# FOR ALL PLATFORMS
@Stitch.register('merrifield')
class MerrifieldHandler(NoneHandler):
def stitch(self, output_file):
output_file = os.path.abspath(output_file)
self.set_stitch_info(ImageName=output_file,
PlatformType=self.platform_type,
# CAUTION: minus 'p' for platformXML
platformXML=self.stitch_platform_file)
logging_debug("Stitching image %s...", self.image_files[self.is_signed])
stitch_cmd = [self.STITCH_PROG, '-c', self.stitch_config_file]
retcode, out, err = _run(stitch_cmd)
logging.info(out)
if retcode:
raise RuntimeError(err)
@Stitch.register('isu')
class IsuHandler(NoneHandler):
# Intel Signing Utility.
# CAUTION: resist the temptation to expand the ISU_HOME_ON_SIGNER path here,
# e.g. through os.path.expanduser(): it's going to expand to the HOME
# environment variable, or the current user's home directory, whereas it
# shall be the SIGNER_LOGIN home directory on the SIGNER_SERVER.
ISU_HOME = os.path.join('vendor', 'intel', 'tools/intel_signing_utility')
ISU_HOME_ON_SIGNER = os.path.join('~', 'intel_signing_utility')
ISU_PROG = os.path.join('isu_stream')
ISU_CERT = os.path.join('c0_234.pem')
def _sign(self):
# ISU will be called in stream mode. We communicate the image hash and
# size to the child process and get back the VRL header. The mechanic
# is the same whether we want to sign the image locally or use a signing
# server, except the script is run through ssh in the latter case.
if _SIGNATURE_BY_SIGNER:
logging_debug("Signature is delegated to %s", _SIGNER)
if _SIGNER is None:
raise EnvironmentError("Please, set both environment variables: "
"SIGNER_LOGIN, SIGNER_SERVER")
# Update paths to what they are supposed to be on the server.
isu_prog = os.path.join(self.ISU_HOME_ON_SIGNER, self.ISU_PROG)
cert_file = os.path.join(self.ISU_HOME_ON_SIGNER, self.ISU_CERT)
isu_cmd = ['ssh', _SIGNER, isu_prog, '-k', cert_file]
else:
logging_debug("Signature is performed on localhost")
isu_prog = os.path.join(self.ISU_HOME, self.ISU_PROG)
cert_file = os.path.join(self.ISU_HOME, self.ISU_CERT)
isu_cmd = [isu_prog, '-k', cert_file]
img_hash, img_size = self.gethash(), self.getsize()
logging_debug("Image information: size=%X hash=%s", img_size, img_hash)
retcode, signature, err = _run(isu_cmd,
input="%s\n%X\n" % (img_hash, img_size))
logging.info(err) # Output redirected to stderr for streaming purpose.
if retcode:
raise RuntimeError("Failed to sign image")
signed_file = os.path.basename(self.image_files[0]) + '.sgn'
signed_file = os.path.join(self.workdir, signed_file)
with open(signed_file, 'wb') as f:
f.write(signature)
f.write(file(self.image_files[0], 'rb').read())
self.is_signed = True
self.image_files.append(signed_file)
def stitch(self, output_file):
"""Sign an image using Intel Signing Utility."""
if _SIGNATURE_REQUESTED:
self._sign()
super(IsuHandler, self).stitch(output_file)
def main():
logging.basicConfig(format=("%(filename)s: %(levelname)s: %(message)s"),
level=logging.INFO)
cmd_parser = argparse.ArgumentParser()
cmd_parser.add_argument('--input', type=file_pathname, required=True,
dest='input_file', metavar='input-pname',
help="Input image file")
cmd_parser.add_argument('--output', required=True,
dest='output_file', metavar='output-pname',
help="Output image file")
cmd_parser.add_argument('--xml', type=os.path.basename, required=True,
dest='definition', metavar='xml-name',
help="XML platform definition")
cmd_parser.add_argument('--platform-type',
dest='type', metavar='platform-type',
help="Platform type")
cmd_parser.add_argument('--sign-with', choices=Stitch.choices(), default='none',
dest='handler_name', metavar='signing-method',
help="Signing method to use")
args = cmd_parser.parse_args()
try:
# XXX: THIS TEST SHALL BE REMOVED ONCE XFSTK-STITCHER SUPPORTS
# platformXML FOR ALL PLATFORMS
if args.type == "MRFLDA0":
args.handler_name = 'merrifield'
stitch_handler = Stitch.get(args.handler_name)
with stitch_handler(args.input_file, args.definition, args.type) as h:
h.stitch(args.output_file)
except Exception, e:
logging.error(e)
return 1
if __name__ == '__main__':
sys.exit(main())