blob: afd5b8e75f6dc2c65da224d5037e18558db06683 [file] [log] [blame]
Brad Bishop6e60e8b2018-02-01 10:27:11 -05001import os
2import json
3import shutil
4
5from oeqa.core.utils.test import getCaseFile, getCaseMethod
6
Patrick Williamsc0f7c042017-02-23 20:41:17 -06007def get_package_manager(d, root_path):
8 """
9 Returns an OE package manager that can install packages in root_path.
10 """
11 from oe.package_manager import RpmPM, OpkgPM, DpkgPM
12
Brad Bishop6e60e8b2018-02-01 10:27:11 -050013 pkg_class = d.getVar("IMAGE_PKGTYPE")
Patrick Williamsc0f7c042017-02-23 20:41:17 -060014 if pkg_class == "rpm":
15 pm = RpmPM(d,
16 root_path,
Brad Bishop316dfdd2018-06-25 12:45:53 -040017 d.getVar('TARGET_VENDOR'),
18 filterbydependencies=False)
Patrick Williamsc0f7c042017-02-23 20:41:17 -060019 pm.create_configs()
20
21 elif pkg_class == "ipk":
22 pm = OpkgPM(d,
23 root_path,
Brad Bishop6e60e8b2018-02-01 10:27:11 -050024 d.getVar("IPKGCONF_TARGET"),
25 d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"))
Patrick Williamsc0f7c042017-02-23 20:41:17 -060026
27 elif pkg_class == "deb":
28 pm = DpkgPM(d,
29 root_path,
Brad Bishop6e60e8b2018-02-01 10:27:11 -050030 d.getVar('PACKAGE_ARCHS'),
31 d.getVar('DPKG_ARCH'))
Patrick Williamsc0f7c042017-02-23 20:41:17 -060032
33 pm.write_index()
34 pm.update()
35
36 return pm
Brad Bishop6e60e8b2018-02-01 10:27:11 -050037
38def find_packages_to_extract(test_suite):
39 """
40 Returns packages to extract required by runtime tests.
41 """
42 from oeqa.core.utils.test import getSuiteCasesFiles
43
44 needed_packages = {}
45 files = getSuiteCasesFiles(test_suite)
46
47 for f in set(files):
48 json_file = _get_json_file(f)
49 if json_file:
50 needed_packages.update(_get_needed_packages(json_file))
51
52 return needed_packages
53
54def _get_json_file(module_path):
55 """
56 Returns the path of the JSON file for a module, empty if doesn't exitst.
57 """
58
59 json_file = '%s.json' % module_path.rsplit('.', 1)[0]
60 if os.path.isfile(module_path) and os.path.isfile(json_file):
61 return json_file
62 else:
63 return ''
64
65def _get_needed_packages(json_file, test=None):
66 """
67 Returns a dict with needed packages based on a JSON file.
68
69 If a test is specified it will return the dict just for that test.
70 """
71 needed_packages = {}
72
73 with open(json_file) as f:
74 test_packages = json.load(f)
75 for key,value in test_packages.items():
76 needed_packages[key] = value
77
78 if test:
79 if test in needed_packages:
80 needed_packages = needed_packages[test]
81 else:
82 needed_packages = {}
83
84 return needed_packages
85
86def extract_packages(d, needed_packages):
87 """
88 Extract packages that will be needed during runtime.
89 """
90
91 import bb
92 import oe.path
93
94 extracted_path = d.getVar('TEST_EXTRACTED_DIR')
95
96 for key,value in needed_packages.items():
97 packages = ()
98 if isinstance(value, dict):
99 packages = (value, )
100 elif isinstance(value, list):
101 packages = value
102 else:
103 bb.fatal('Failed to process needed packages for %s; '
104 'Value must be a dict or list' % key)
105
106 for package in packages:
107 pkg = package['pkg']
108 rm = package.get('rm', False)
109 extract = package.get('extract', True)
110
111 if extract:
112 #logger.debug(1, 'Extracting %s' % pkg)
113 dst_dir = os.path.join(extracted_path, pkg)
114 # Same package used for more than one test,
115 # don't need to extract again.
116 if os.path.exists(dst_dir):
117 continue
118
119 # Extract package and copy it to TEST_EXTRACTED_DIR
120 pkg_dir = _extract_in_tmpdir(d, pkg)
121 oe.path.copytree(pkg_dir, dst_dir)
122 shutil.rmtree(pkg_dir)
123
124 else:
125 #logger.debug(1, 'Copying %s' % pkg)
126 _copy_package(d, pkg)
127
128def _extract_in_tmpdir(d, pkg):
129 """"
130 Returns path to a temp directory where the package was
131 extracted without dependencies.
132 """
133
134 from oeqa.utils.package_manager import get_package_manager
135
136 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
137 pm = get_package_manager(d, pkg_path)
138 extract_dir = pm.extract(pkg)
139 shutil.rmtree(pkg_path)
140
141 return extract_dir
142
143def _copy_package(d, pkg):
144 """
145 Copy the RPM, DEB or IPK package to dst_dir
146 """
147
148 from oeqa.utils.package_manager import get_package_manager
149
150 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
151 dst_dir = d.getVar('TEST_PACKAGED_DIR')
152 pm = get_package_manager(d, pkg_path)
153 pkg_info = pm.package_info(pkg)
154 file_path = pkg_info[pkg]['filepath']
155 shutil.copy2(file_path, dst_dir)
156 shutil.rmtree(pkg_path)
157
158def install_package(test_case):
159 """
160 Installs package in DUT if required.
161 """
162 needed_packages = test_needs_package(test_case)
163 if needed_packages:
164 _install_uninstall_packages(needed_packages, test_case, True)
165
166def uninstall_package(test_case):
167 """
168 Uninstalls package in DUT if required.
169 """
170 needed_packages = test_needs_package(test_case)
171 if needed_packages:
172 _install_uninstall_packages(needed_packages, test_case, False)
173
174def test_needs_package(test_case):
175 """
176 Checks if a test case requires to install/uninstall packages.
177 """
178 test_file = getCaseFile(test_case)
179 json_file = _get_json_file(test_file)
180
181 if json_file:
182 test_method = getCaseMethod(test_case)
183 needed_packages = _get_needed_packages(json_file, test_method)
184 if needed_packages:
185 return needed_packages
186
187 return None
188
189def _install_uninstall_packages(needed_packages, test_case, install=True):
190 """
191 Install/Uninstall packages in the DUT without using a package manager
192 """
193
194 if isinstance(needed_packages, dict):
195 packages = [needed_packages]
196 elif isinstance(needed_packages, list):
197 packages = needed_packages
198
199 for package in packages:
200 pkg = package['pkg']
201 rm = package.get('rm', False)
202 extract = package.get('extract', True)
203 src_dir = os.path.join(test_case.tc.extract_dir, pkg)
204
205 # Install package
206 if install and extract:
207 test_case.tc.target.copyDirTo(src_dir, '/')
208
209 # Uninstall package
210 elif not install and rm:
211 test_case.tc.target.deleteDirStructure(src_dir, '/')