blob: 2d358f7172eee7083501e5d8d0d0b71a010351b7 [file] [log] [blame]
Brad Bishopc342db32019-05-15 21:57:59 -04001#
2# SPDX-License-Identifier: MIT
3#
4
Brad Bishop6e60e8b2018-02-01 10:27:11 -05005import os
6import json
7import shutil
8
9from oeqa.core.utils.test import getCaseFile, getCaseMethod
10
Patrick Williamsc0f7c042017-02-23 20:41:17 -060011def get_package_manager(d, root_path):
12 """
13 Returns an OE package manager that can install packages in root_path.
14 """
15 from oe.package_manager import RpmPM, OpkgPM, DpkgPM
16
Brad Bishop6e60e8b2018-02-01 10:27:11 -050017 pkg_class = d.getVar("IMAGE_PKGTYPE")
Patrick Williamsc0f7c042017-02-23 20:41:17 -060018 if pkg_class == "rpm":
19 pm = RpmPM(d,
20 root_path,
Brad Bishop316dfdd2018-06-25 12:45:53 -040021 d.getVar('TARGET_VENDOR'),
22 filterbydependencies=False)
Patrick Williamsc0f7c042017-02-23 20:41:17 -060023 pm.create_configs()
24
25 elif pkg_class == "ipk":
26 pm = OpkgPM(d,
27 root_path,
Brad Bishop6e60e8b2018-02-01 10:27:11 -050028 d.getVar("IPKGCONF_TARGET"),
Brad Bishop1a4b7ee2018-12-16 17:11:34 -080029 d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"),
30 filterbydependencies=False)
Patrick Williamsc0f7c042017-02-23 20:41:17 -060031
32 elif pkg_class == "deb":
33 pm = DpkgPM(d,
34 root_path,
Brad Bishop6e60e8b2018-02-01 10:27:11 -050035 d.getVar('PACKAGE_ARCHS'),
Brad Bishop1a4b7ee2018-12-16 17:11:34 -080036 d.getVar('DPKG_ARCH'),
37 filterbydependencies=False)
Patrick Williamsc0f7c042017-02-23 20:41:17 -060038
39 pm.write_index()
40 pm.update()
41
42 return pm
Brad Bishop6e60e8b2018-02-01 10:27:11 -050043
44def find_packages_to_extract(test_suite):
45 """
46 Returns packages to extract required by runtime tests.
47 """
48 from oeqa.core.utils.test import getSuiteCasesFiles
49
50 needed_packages = {}
51 files = getSuiteCasesFiles(test_suite)
52
53 for f in set(files):
54 json_file = _get_json_file(f)
55 if json_file:
56 needed_packages.update(_get_needed_packages(json_file))
57
58 return needed_packages
59
60def _get_json_file(module_path):
61 """
62 Returns the path of the JSON file for a module, empty if doesn't exitst.
63 """
64
65 json_file = '%s.json' % module_path.rsplit('.', 1)[0]
66 if os.path.isfile(module_path) and os.path.isfile(json_file):
67 return json_file
68 else:
69 return ''
70
71def _get_needed_packages(json_file, test=None):
72 """
73 Returns a dict with needed packages based on a JSON file.
74
75 If a test is specified it will return the dict just for that test.
76 """
77 needed_packages = {}
78
79 with open(json_file) as f:
80 test_packages = json.load(f)
81 for key,value in test_packages.items():
82 needed_packages[key] = value
83
84 if test:
85 if test in needed_packages:
86 needed_packages = needed_packages[test]
87 else:
88 needed_packages = {}
89
90 return needed_packages
91
92def extract_packages(d, needed_packages):
93 """
94 Extract packages that will be needed during runtime.
95 """
96
97 import bb
98 import oe.path
99
100 extracted_path = d.getVar('TEST_EXTRACTED_DIR')
101
102 for key,value in needed_packages.items():
103 packages = ()
104 if isinstance(value, dict):
105 packages = (value, )
106 elif isinstance(value, list):
107 packages = value
108 else:
109 bb.fatal('Failed to process needed packages for %s; '
110 'Value must be a dict or list' % key)
111
112 for package in packages:
113 pkg = package['pkg']
114 rm = package.get('rm', False)
115 extract = package.get('extract', True)
116
117 if extract:
118 #logger.debug(1, 'Extracting %s' % pkg)
119 dst_dir = os.path.join(extracted_path, pkg)
120 # Same package used for more than one test,
121 # don't need to extract again.
122 if os.path.exists(dst_dir):
123 continue
124
125 # Extract package and copy it to TEST_EXTRACTED_DIR
126 pkg_dir = _extract_in_tmpdir(d, pkg)
127 oe.path.copytree(pkg_dir, dst_dir)
128 shutil.rmtree(pkg_dir)
129
130 else:
131 #logger.debug(1, 'Copying %s' % pkg)
132 _copy_package(d, pkg)
133
134def _extract_in_tmpdir(d, pkg):
135 """"
136 Returns path to a temp directory where the package was
137 extracted without dependencies.
138 """
139
140 from oeqa.utils.package_manager import get_package_manager
141
142 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
143 pm = get_package_manager(d, pkg_path)
144 extract_dir = pm.extract(pkg)
145 shutil.rmtree(pkg_path)
146
147 return extract_dir
148
149def _copy_package(d, pkg):
150 """
151 Copy the RPM, DEB or IPK package to dst_dir
152 """
153
154 from oeqa.utils.package_manager import get_package_manager
155
156 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
157 dst_dir = d.getVar('TEST_PACKAGED_DIR')
158 pm = get_package_manager(d, pkg_path)
159 pkg_info = pm.package_info(pkg)
160 file_path = pkg_info[pkg]['filepath']
161 shutil.copy2(file_path, dst_dir)
162 shutil.rmtree(pkg_path)
163
164def install_package(test_case):
165 """
166 Installs package in DUT if required.
167 """
168 needed_packages = test_needs_package(test_case)
169 if needed_packages:
170 _install_uninstall_packages(needed_packages, test_case, True)
171
172def uninstall_package(test_case):
173 """
174 Uninstalls package in DUT if required.
175 """
176 needed_packages = test_needs_package(test_case)
177 if needed_packages:
178 _install_uninstall_packages(needed_packages, test_case, False)
179
180def test_needs_package(test_case):
181 """
182 Checks if a test case requires to install/uninstall packages.
183 """
184 test_file = getCaseFile(test_case)
185 json_file = _get_json_file(test_file)
186
187 if json_file:
188 test_method = getCaseMethod(test_case)
189 needed_packages = _get_needed_packages(json_file, test_method)
190 if needed_packages:
191 return needed_packages
192
193 return None
194
195def _install_uninstall_packages(needed_packages, test_case, install=True):
196 """
197 Install/Uninstall packages in the DUT without using a package manager
198 """
199
200 if isinstance(needed_packages, dict):
201 packages = [needed_packages]
202 elif isinstance(needed_packages, list):
203 packages = needed_packages
204
205 for package in packages:
206 pkg = package['pkg']
207 rm = package.get('rm', False)
208 extract = package.get('extract', True)
209 src_dir = os.path.join(test_case.tc.extract_dir, pkg)
210
211 # Install package
212 if install and extract:
213 test_case.tc.target.copyDirTo(src_dir, '/')
214
215 # Uninstall package
216 elif not install and rm:
217 test_case.tc.target.deleteDirStructure(src_dir, '/')