Глубокое погружение в исходный код moto

версия moto: 3.1.x

Amazon S3

Ведро

用FakeBucket表示每個新增的bucket。

https://github.com/spulec/moto/blob/3.1.6/moto/s3/models.py#L847

如何實作create_bucket

每個bucket都會對應一個FakeBucket instance。

並且用buckets(dict object)紀錄每個的bucket對應的instance。

def create_bucket(self, bucket_name, region_name):
    ...
    new_bucket = FakeBucket(name=bucket_name, region_name=region_name)

    self.buckets[bucket_name] = new_bucket
    return new_bucket
Вход в полноэкранный режим Выйти из полноэкранного режима

如何實作S3物件的多版本特性

_VersionedKeyStore 實現dict一個key可以儲存多個value,但是get method只會回傳最新的value。

https://github.com/spulec/moto/blob/3.1.6/moto/s3/utils.py#L102

Объект

用FakeKey表示每個新增的object。

https://github.com/spulec/moto/blob/3.1.6/moto/s3/models.py#L102

如何實作put_object

https://github.com/spulec/moto/blob/3.1.6/moto/s3/models.py#L1608

上傳的物件對應至參數value

如何實作delete_object

dictpop模擬刪除物件的行為。

https://github.com/spulec/moto/blob/3.1.6/moto/s3/models.py#L1989

moto如何避免吃光記憶體 — SpooledTemporaryFile

由SpooledTemporaryFile instance負責處理Python object的儲存方式。當物件大小超過max_size就會寫入硬碟。

https://docs.python.org/3/library/tempfile.html#tempfile.SpooledTemporaryFile

Этот класс работает точно так же, как и TemporaryFile(), за исключением того, что данные хранятся в памяти, пока размер файла не превысит max_size, или пока не будет вызван метод fileno() файла, после чего содержимое записывается на диск и работа продолжается как с TemporaryFile().


self._value_buffer = tempfile.SpooledTemporaryFile(self._max_buffer_size)

...

@property
def value(self):
    self.lock.acquire()
    self._value_buffer.seek(0)
    r = self._value_buffer.read()
    r = copy.copy(r)
    self.lock.release()
    return r

...

@value.setter
def value(self, new_value):
    self._value_buffer.seek(0)
    self._value_buffer.truncate()

    # Hack for working around moto's own unit tests; this probably won't
    # actually get hit in normal use.
    if isinstance(new_value, str):
        new_value = new_value.encode(DEFAULT_TEXT_ENCODING)
    self._value_buffer.write(new_value)
    self.contentsize = len(new_value)
Вход в полноэкранный режим Выход из полноэкранного режима

Оцените статью
devanswers.ru
Добавить комментарий