diff --git a/albertson/base.py b/albertson/base.py index 5615b69..ad0f7cb 100644 --- a/albertson/base.py +++ b/albertson/base.py @@ -206,7 +206,7 @@ def name(self): @property def count(self): - print self.dynamo_item + print(self.dynamo_item) return self.dynamo_item['count'] @property diff --git a/albertson/base.py.bak b/albertson/base.py.bak new file mode 100644 index 0000000..5615b69 --- /dev/null +++ b/albertson/base.py.bak @@ -0,0 +1,236 @@ +from datetime import datetime + +import boto +from boto.dynamodb.exceptions import DynamoDBKeyNotFoundError + +ISO_FORMAT = '%Y-%m-%dT%H:%M:%S' + + +class CounterPool(object): + ''' + Handles schema level interactions with DynamoDB and generates individual + counters as needed. + ''' + table_name = None + schema = { + 'hash_key_name': 'counter_name', + 'hash_key_proto_value': 'S', + } + read_units = 3 + write_units = 5 + + def __init__(self, aws_access_key=None, aws_secret_key=None, table_name=None, schema=None, read_units=None, write_units=None, auto_create_table=True, ): + """ + :aws_access_key: + AWS Acccess Key ID with permissions to use DynamoDB + :aws_secret_key: + AWS Access Secret Key for the given Access Key ID + :table_name: + The DynamoDB table that should be used to store this pool's + counters. See http://bit.ly/DynamoDBModel for details on + DynamoDB's data model. + :schema: + The schema that will be used to create a table if one does not + already exist. See the `boto`_ + docs for details on what's expected for a schema. + :read_units: + Read throughput to be set when a table is created. See + http://bit.ly/DynamoThoughput for details on Dynamo's provisioned + throughput system. + :write_units: + Write throughput to be set when a table is created. + :auto_create_table: + Should Albertson create a dynamodb table if the provided + `table_name` doesn't exist. + """ + self.conn = self.get_conn(aws_access_key, aws_secret_key) + self.table_name = table_name or self.table_name + self.schema = schema or self.schema + self.read_units = read_units or self.read_units + self.write_units = write_units or self.write_units + self.auto_create_table = auto_create_table + + super(CounterPool, self).__init__() + + def get_conn(self, aws_access_key=None, aws_secret_key=None): + ''' + Hook point for overriding how the CounterPool gets its connection to + AWS. + ''' + return boto.connect_dynamodb( + aws_access_key_id=aws_access_key, + aws_secret_access_key=aws_secret_key, + ) + + def get_table_name(self): + ''' + Hook point for overriding how the CounterPool determines the table name + to use. + ''' + if not self.table_name: + raise NotImplementedError( + 'You must provide a table_name value or override the get_table_name method' + ) + return self.table_name + + def get_schema(self): + ''' + Hook point for overriding how the CounterPool determines the schema + to be used when creating a missing table. + ''' + if not self.schema: + raise NotImplementedError( + 'You must provide a schema value or override the get_schema method' + ) + + return self.conn.create_schema(**self.schema) + + def get_read_units(self): + ''' + Hook point for overriding how the CounterPool determines the read + throughput units to set on a newly created table. + ''' + return self.read_units + + def get_write_units(self): + ''' + Hook point for overriding how the CounterPool determines the write + throughput units to set on a newly created table. + ''' + return self.write_units + + def create_table(self): + ''' + Hook point for overriding how the CounterPool creates a new table + in DynamooDB + ''' + table = self.conn.create_table( + name=self.get_table_name(), + schema=self.get_schema(), + read_units=self.get_read_units(), + write_units=self.get_write_units(), + ) + + if table.status != 'ACTIVE': + table.refresh(wait_for_active=True, retry_seconds=1) + + return table + + def get_table(self): + ''' + Hook point for overriding how the CounterPool transforms table_name + into a boto DynamoDB Table object. + ''' + if hasattr(self, '_table'): + table = self._table + else: + try: + table = self.conn.get_table(self.get_table_name()) + except boto.exception.DynamoDBResponseError: + if self.auto_create_table: + table = self.create_table() + else: + raise + + self._table = table + + return table + + def create_item(self, hash_key, start=0, extra_attrs=None): + ''' + Hook point for overriding how the CouterPool creates a DynamoDB item + for a given counter when an existing item can't be found. + ''' + table = self.get_table() + now = datetime.utcnow().replace(microsecond=0).isoformat() + attrs = { + 'created_on': now, + 'modified_on': now, + 'count': start, + } + + if extra_attrs: + attrs.update(extra_attrs) + + item = table.new_item( + hash_key=hash_key, + attrs=attrs, + ) + + return item + + def get_item(self, hash_key, start=0, extra_attrs=None): + ''' + Hook point for overriding how the CouterPool fetches a DynamoDB item + for a given counter. + ''' + table = self.get_table() + + try: + item = table.get_item(hash_key=hash_key) + except DynamoDBKeyNotFoundError: + item = None + + if item is None: + item = self.create_item( + hash_key=hash_key, + start=start, + extra_attrs=extra_attrs, + ) + + return item + + def get_counter(self, name, start=0): + ''' + Gets the DynamoDB item behind a counter and ties it to a Counter + instace. + ''' + item = self.get_item(hash_key=name, start=start) + counter = Counter(dynamo_item=item, pool=self) + + return counter + + +class Counter(object): + ''' + Interface to individual counters. + ''' + + def __init__(self, dynamo_item, pool): + self.dynamo_item = dynamo_item + self.pool = pool + + @property + def name(self): + return self.dynamo_item['counter_name'] + + @property + def count(self): + print self.dynamo_item + return self.dynamo_item['count'] + + @property + def created_on(self): + return datetime.strptime(self.dynamo_item['created_on'], ISO_FORMAT) + + @property + def modified_on(self): + return datetime.strptime(self.dynamo_item['modified_on'], ISO_FORMAT) + + def refresh(self): + self.dynamo_item = self.pool.get_item(hash_key=self.name) + + def increment(self, amount=1): + item = self.dynamo_item + item.add_attribute('count', amount) + item.put_attribute( + 'modified_on', + datetime.utcnow().replace(microsecond=0).isoformat() + ) + result = item.save(return_values='UPDATED_NEW') + item.update(result['Attributes']) + + return self.count + + def decrement(self, amount=1): + return self.increment(amount * -1) diff --git a/setup.py b/setup.py index d614446..c8455a7 100755 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ #!/usr/bin/env python try: from setuptools import setup -except ImportError, err: +except ImportError as err: from distutils.core import setup from albertson import VERSION diff --git a/setup.py.bak b/setup.py.bak new file mode 100755 index 0000000..d614446 --- /dev/null +++ b/setup.py.bak @@ -0,0 +1,28 @@ +#!/usr/bin/env python +try: + from setuptools import setup +except ImportError, err: + from distutils.core import setup + +from albertson import VERSION + +setup( + name='Albertson', + version=".".join(map(str, VERSION)), + description="Easy to use, scalable, counters powered by Amazon's DynamoDB", + author="Sean O'Connor", + author_email="sean@focuslab.io", + url="https://github.com/FocusLab/Albertson", + packages=['albertson', 'albertson.dynamodb_utils'], + license="BSD", + long_description=open('README.md').read(), + install_requires=['boto>=2.2.2'], + classifiers=[ + 'Development Status :: 2 - Pre-Alpha', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: BSD License', + 'Programming Language :: Python', + 'Topic :: Software Development :: Libraries :: Python Modules', + ], + test_suite='nose.collector', +) diff --git a/tests/base.py b/tests/base.py index f5627bd..00f59e2 100644 --- a/tests/base.py +++ b/tests/base.py @@ -372,8 +372,8 @@ def test_counter_increment(self): pool = self.get_pool() item = self.get_item() counter = pool.get_counter(item.hash_key) - print item - print counter.dynamo_item + print(item) + print(counter.dynamo_item) self.assertEqual(item, counter.dynamo_item) old_count = counter.count @@ -397,8 +397,8 @@ def test_counter_decrement(self): pool = self.get_pool() item = self.get_item() counter = pool.get_counter(item.hash_key, start=1) - print item - print counter.dynamo_item + print(item) + print(counter.dynamo_item) self.assertEqual(item, counter.dynamo_item) old_count = counter.count diff --git a/tests/base.py.bak b/tests/base.py.bak new file mode 100644 index 0000000..f5627bd --- /dev/null +++ b/tests/base.py.bak @@ -0,0 +1,417 @@ +from datetime import datetime +import unittest + +import boto +from boto.dynamodb.exceptions import DynamoDBKeyNotFoundError + +from mock import MagicMock, sentinel + +from testconfig import config + +from albertson import CounterPool +from albertson.dynamodb_utils.testing import dynamo_cleanup, DynamoDeleteMixin + +ISO_FORMAT = '%Y-%m-%dT%H:%M:%S' + + +class BaseCounterPoolTests(DynamoDeleteMixin, unittest.TestCase): + + def __init__(self, *args, **kwargs): + self.tables = {} + + super(BaseCounterPoolTests, self).__init__(*args, **kwargs) + + def get_connection(self): + conn = getattr(self, '_conn', None) + + if not conn: + conn = boto.connect_dynamodb( + aws_access_key_id=config['aws']['access_key'], + aws_secret_access_key=config['aws']['secret_key'], + ) + self._conn = conn + + return conn + + def get_schema(self, **kwargs): + conn = self.get_connection() + real_kwargs = { + 'hash_key_name': 'counter_name', + 'hash_key_proto_value': 'S', + } + real_kwargs.update(kwargs) + + return conn.create_schema(**real_kwargs) + + def get_table(self, table_name=None, schema_kwargs=None): + table_name = table_name or config['albertson']['test_table_name'] + table = self.tables.get(table_name, None) + + if table is None: + conn = self.get_connection() + + try: + table = conn.get_table(table_name) + except boto.exception.DynamoDBResponseError: + table = None + + if table is None: + table = self.create_table(table_name, schema_kwargs) + self.tables[table.name] = table + + return table + + def create_table(self, table_name=None, schema_kwargs=None): + table_name = table_name or config['albertson']['test_table_name'] + schema_kwargs = schema_kwargs or {} + + conn = self.get_connection() + schema = self.get_schema(**schema_kwargs) + + table = conn.create_table( + name=table_name, + schema=schema, + read_units=3, + write_units=5, + ) + + if table.status != 'ACTIVE': + table.refresh(wait_for_active=True, retry_seconds=1) + + return table + + def get_pool(self, pool_class=None, **kwargs): + pool_class = pool_class or CounterPool + real_kwargs = { + 'aws_access_key': config['aws']['access_key'], + 'aws_secret_key': config['aws']['secret_key'], + 'table_name': config['albertson']['test_table_name'], + 'auto_create_table': False, + } + real_kwargs.update(kwargs) + + return pool_class(**real_kwargs) + + def get_item(self, hash_key='test', attrs=None): + table = self.get_table() + real_attrs = { + 'count': 5, + 'created': '2012-01-02T23:32:13', + 'modified': '2012-01-02T24:33:23', + } + if attrs: + real_attrs.update(attrs) + + item = table.new_item( + hash_key=hash_key, + attrs=real_attrs, + ) + item.put() + + return item + + def test_base_init(self): + pool = self.get_pool() + + pool.conn.list_tables() + + def test_get_init_table_name(self): + pool = self.get_pool() + + expected = config['albertson']['test_table_name'] + result = pool.get_table_name() + + self.assertEquals(expected, result) + + def test_get_empty_table_name(self): + pool = self.get_pool(table_name='') + + with self.assertRaises(NotImplementedError): + pool.get_table_name() + + def test_get_attr_table_name(self): + class TestCounterPool(CounterPool): + table_name = 'some_name' + + pool = self.get_pool(pool_class=TestCounterPool, table_name=None) + + expected = 'some_name' + result = pool.get_table_name() + + self.assertEquals(expected, result) + + def test_get_default_schema(self): + pool = self.get_pool() + conn = self.get_connection() + + expected = conn.create_schema(**pool.schema).dict + result = pool.get_schema().dict + + self.assertEquals(expected, result) + self.assertIsNotNone(result) + + def test_get_init_schema(self): + schema_dict = {'hash_key_name': 'test', 'hash_key_proto_value': 'S'} + pool = self.get_pool(schema=schema_dict) + conn = self.get_connection() + + expected = conn.create_schema(**schema_dict).dict + result = pool.get_schema().dict + + self.assertEquals(expected, result) + + def test_get_empty_schema(self): + pool = self.get_pool() + pool.schema = None + + with self.assertRaises(NotImplementedError): + pool.get_schema() + + def test_create_table(self): + pool = self.get_pool(table_name='albertson-create-test') + + table = pool.create_table() + + table.refresh(wait_for_active=True, retry_seconds=1) + + self.assertEquals(table.status, 'ACTIVE') + + table.delete() + + @dynamo_cleanup() + def test_get_existing_table(self): + self.get_table() + pool = self.get_pool() + + assert pool.get_table() + + def test_get_missing_table_without_auto_create(self): + pool = self.get_pool(table_name='nonexistent') + + with self.assertRaises(boto.exception.DynamoDBResponseError): + pool.get_table() + + def test_get_missing_table_with_auto_create(self): + pool = self.get_pool(auto_create_table=True, table_name='nonexistent') + pool.create_table = MagicMock(name='create_table') + + expected = sentinel.table_return + pool.create_table.return_value = expected + + result = pool.get_table() + + pool.create_table.assert_called_with() + self.assertEquals(expected, result) + + @dynamo_cleanup() + def test_create_item(self): + hash_key = 'test' + table = self.get_table() + pool = self.get_pool(auto_create_table=True) + now = datetime.utcnow().replace(microsecond=0) + + expected = { + 'counter_name': hash_key, + 'count': 0, + } + result = pool.create_item(hash_key=hash_key) + + self.assertDictContainsSubset(expected, result) + + created_offset = datetime.strptime(result['created_on'], ISO_FORMAT) - now + modified_offset = datetime.strptime(result['modified_on'], ISO_FORMAT) - now + + self.assertLess(created_offset.seconds, 2) + self.assertGreaterEqual(created_offset.seconds, 0) + self.assertLess(modified_offset.seconds, 2) + self.assertGreaterEqual(modified_offset.seconds, 0) + + with self.assertRaises(DynamoDBKeyNotFoundError): + table.get_item(hash_key=hash_key, consistent_read=True) + + @dynamo_cleanup() + def test_create_item_with_extra_attrs(self): + hash_key = 'test' + table = self.get_table() + pool = self.get_pool(auto_create_table=True) + now = datetime.utcnow().replace(microsecond=0) + + expected = { + 'counter_name': hash_key, + 'count': 0, + 'foo': 'bar', + } + result = pool.create_item(hash_key=hash_key, extra_attrs={'foo': 'bar'}) + + self.assertDictContainsSubset(expected, result) + + created_offset = datetime.strptime(result['created_on'], ISO_FORMAT) - now + modified_offset = datetime.strptime(result['modified_on'], ISO_FORMAT) - now + + self.assertLess(created_offset.seconds, 2) + self.assertGreaterEqual(created_offset.seconds, 0) + self.assertLess(modified_offset.seconds, 2) + self.assertGreaterEqual(modified_offset.seconds, 0) + + with self.assertRaises(DynamoDBKeyNotFoundError): + table.get_item(hash_key=hash_key, consistent_read=True) + + @dynamo_cleanup() + def test_get_missing_item(self): + hash_key = 'test' + pool = self.get_pool(auto_create_table=True) + pool.create_item = MagicMock(name='create_item') + + expected = sentinel.item_return + pool.create_item.return_value = expected + + result = pool.get_item(hash_key) + + pool.create_item.assert_called_with(hash_key=hash_key, start=0, extra_attrs=None) + self.assertEquals(expected, result) + + @dynamo_cleanup() + def test_get_existing_item(self): + hash_key = 'test' + table = self.get_table() + expected = table.new_item( + hash_key=hash_key, + attrs={ + 'count': 5, + 'created': '2012-01-02T23:32:13', + 'modified': '2012-01-02T24:33:23', + } + ) + expected.put() + pool = self.get_pool() + + result = pool.get_item(hash_key) + + self.assertEqual(expected, result) + + @dynamo_cleanup() + def test_table_caching(self): + pool = self.get_pool() + pool._table = sentinel.cached_table + + expected = sentinel.cached_table + result = pool.get_table() + + self.assertEqual(expected, result) + + @dynamo_cleanup() + def test_get_counter(self): + pool = self.get_pool() + name = 'test' + + result = pool.get_counter(name) + + self.assertEqual(pool, result.pool) + + @dynamo_cleanup() + def test_counter_name(self): + + pool = self.get_pool() + expected = 'test' + + result = pool.get_counter(expected).name + + self.assertEquals(expected, result) + + @dynamo_cleanup() + def test_counter_count(self): + pool = self.get_pool() + name = 'test' + + expected = 0 + result = pool.get_counter(name).count + + self.assertEquals(expected, result) + + @dynamo_cleanup() + def test_counter_created_on(self): + pool = self.get_pool() + name = 'test' + counter = pool.get_counter(name) + + expected = counter.dynamo_item['created_on'] + result = counter.created_on.isoformat() + + self.assertEquals(expected, result) + + @dynamo_cleanup() + def test_counter_modified_on(self): + pool = self.get_pool() + name = 'test' + counter = pool.get_counter(name) + + expected = counter.dynamo_item['modified_on'] + result = counter.modified_on.isoformat() + + self.assertEquals(expected, result) + + @dynamo_cleanup() + def test_counter_refresh(self): + pool = self.get_pool() + expected = self.get_item() + counter = pool.get_counter(expected.hash_key) + + self.assertEqual(expected, counter.dynamo_item) + + expected.add_attribute('count', 3) + ret_val = expected.save(return_values='UPDATED_NEW') + expected.update(ret_val['Attributes']) + + counter.refresh() + + self.assertEqual(expected, counter.dynamo_item) + + @dynamo_cleanup() + def test_counter_increment(self): + table = self.get_table() + pool = self.get_pool() + item = self.get_item() + counter = pool.get_counter(item.hash_key) + print item + print counter.dynamo_item + + self.assertEqual(item, counter.dynamo_item) + old_count = counter.count + + now = datetime.utcnow().replace(microsecond=0) + counter.increment() + + expected_count = old_count + 1 + self.assertEqual(expected_count, counter.count) + + fetched_item = table.get_item(item.hash_key) + modified_offset = datetime.strptime(fetched_item['modified_on'], ISO_FORMAT) - now + self.assertEqual(expected_count, fetched_item['count']) + self.assertEqual(fetched_item, counter.dynamo_item) + self.assertLess(modified_offset.seconds, 2) + self.assertGreaterEqual(modified_offset.seconds, 0) + + @dynamo_cleanup() + def test_counter_decrement(self): + table = self.get_table() + pool = self.get_pool() + item = self.get_item() + counter = pool.get_counter(item.hash_key, start=1) + print item + print counter.dynamo_item + + self.assertEqual(item, counter.dynamo_item) + old_count = counter.count + + now = datetime.utcnow().replace(microsecond=0) + counter.decrement() + + expected_count = old_count - 1 + self.assertEqual(expected_count, counter.count) + + fetched_item = table.get_item(item.hash_key) + modified_offset = datetime.strptime(fetched_item['modified_on'], ISO_FORMAT) - now + self.assertEqual(expected_count, fetched_item['count']) + self.assertEqual(fetched_item, counter.dynamo_item) + self.assertLess(modified_offset.seconds, 2) + self.assertGreaterEqual(modified_offset.seconds, 0)