blob: 812bcbd7b8b02a4ce8e32db3c98d45e9a27547c2 [file] [log] [blame]
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# BitBake Test for lib/bb/persist_data/
#
# Copyright (C) 2018 Garmin Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import unittest
import bb.data
import bb.persist_data
import tempfile
import threading
class PersistDataTest(unittest.TestCase):
def _create_data(self):
return bb.persist_data.persist('TEST_PERSIST_DATA', self.d)
def setUp(self):
self.d = bb.data.init()
self.tempdir = tempfile.TemporaryDirectory()
self.d['PERSISTENT_DIR'] = self.tempdir.name
self.data = self._create_data()
self.items = {
'A1': '1',
'B1': '2',
'C2': '3'
}
self.stress_count = 10000
self.thread_count = 5
for k,v in self.items.items():
self.data[k] = v
def tearDown(self):
self.tempdir.cleanup()
def _iter_helper(self, seen, iterator):
with iter(iterator):
for v in iterator:
self.assertTrue(v in seen)
seen.remove(v)
self.assertEqual(len(seen), 0, '%s not seen' % seen)
def test_get(self):
for k, v in self.items.items():
self.assertEqual(self.data[k], v)
self.assertIsNone(self.data.get('D'))
with self.assertRaises(KeyError):
self.data['D']
def test_set(self):
for k, v in self.items.items():
self.data[k] += '-foo'
for k, v in self.items.items():
self.assertEqual(self.data[k], v + '-foo')
def test_delete(self):
self.data['D'] = '4'
self.assertEqual(self.data['D'], '4')
del self.data['D']
self.assertIsNone(self.data.get('D'))
with self.assertRaises(KeyError):
self.data['D']
def test_contains(self):
for k in self.items:
self.assertTrue(k in self.data)
self.assertTrue(self.data.has_key(k))
self.assertFalse('NotFound' in self.data)
self.assertFalse(self.data.has_key('NotFound'))
def test_len(self):
self.assertEqual(len(self.data), len(self.items))
def test_iter(self):
self._iter_helper(set(self.items.keys()), self.data)
def test_itervalues(self):
self._iter_helper(set(self.items.values()), self.data.itervalues())
def test_iteritems(self):
self._iter_helper(set(self.items.items()), self.data.iteritems())
def test_get_by_pattern(self):
self._iter_helper({'1', '2'}, self.data.get_by_pattern('_1'))
def _stress_read(self, data):
for i in range(self.stress_count):
for k in self.items:
data[k]
def _stress_write(self, data):
for i in range(self.stress_count):
for k, v in self.items.items():
data[k] = v + str(i)
def _validate_stress(self):
for k, v in self.items.items():
self.assertEqual(self.data[k], v + str(self.stress_count - 1))
def test_stress(self):
self._stress_read(self.data)
self._stress_write(self.data)
self._validate_stress()
def test_stress_threads(self):
def read_thread():
data = self._create_data()
self._stress_read(data)
def write_thread():
data = self._create_data()
self._stress_write(data)
threads = []
for i in range(self.thread_count):
threads.append(threading.Thread(target=read_thread))
threads.append(threading.Thread(target=write_thread))
for t in threads:
t.start()
self._stress_read(self.data)
for t in threads:
t.join()
self._validate_stress()