Вопрос по json, python, mongodb – Оптимизация: дамп JSON из потокового API в Mongo

8

Background: у меня естьpython модуль настроен на захват объектов JSON из потокового API и сохранение их (массовая вставка по 25 за раз) в MongoDB с использованием pymongo. Для сравнения, у меня также есть команда bash дляcurl из того же потокового API иpipe это кmongoimport, Оба эти подхода хранят данные в отдельных коллекциях.

Периодически я отслеживаюcount() из коллекций, чтобы проверить, как они живут.

Пока я вижуpython модуль отстает примерно на 1000 объектов JSON заcurl | mongoimport подход.

Problem: Как я могу оптимизировать мойpython модуль должен синхронизироваться с curl | mongoimport?

Я не могу использоватьtweetstream так как я не использую Twitter API, а сторонний сервис потоковой передачи.

Может ли кто-нибудь помочь мне здесь?

Python module:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

Спасибо за прочтение.

как выглядит ваш скрипт / команда bash curl? Asya Kamsky
Какую версию монго и какую версию пимонго вы используете? Asya Kamsky
@AsyaKamsky Python 2.7, MongoDb 2.0.4 и PyMongo 2.2. Sagar Hatekar
Имеются ли у документов, которые вы вставляете, "_id" поле? Asya Kamsky
@AsyaKamsky Да, они делают. Sagar Hatekar

Ваш Ответ

2   ответа
1

WRITEFUNCTION Перезвонитеhandle_dataв этом случае вызывается для каждой строки, просто загрузитеJSON непосредственно. Иногда, однако, может быть дваJSON объекты, содержащиеся в данных. Извините, я не могу опубликоватьcurl Команда, которую я использую, поскольку она содержит наши учетные данные. Но, как я уже сказал, это общая проблема, применимая к любому потоковому API.


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    
3

                if self.chunk_count % 50 == 0
                    self.raw_tweets.insert(self.tweet_list)
                    self.chunk_count = 0

Вы сбрасываете chunk_count, но не сбрасываете tweet_list. Поэтому во второй раз вы пытаетесь вставить 100 элементов (50 новых плюс 50, которые уже были отправлены в БД накануне). Вы исправили это, но все еще видите разницу в производительности.

Целый размер партии оказывается красной сельдью. Я попытался использовать большой файл json и загрузить его через python, а не через mongoimport, и Python всегда был быстрее (даже в безопасном режиме - см. Ниже).

При более внимательном рассмотрении вашего кода я понял, что проблема заключается в том, что потоковый API на самом деле обрабатывает ваши данные кусками. Ожидается, что вы просто возьмете эти куски и поместите их в базу данных (это то, что делает mongoimport). Дополнительная работа, которую ваш питон выполняет для разделения потока, добавления его в список и последующей периодической отправки пакетов в Mongo, - это, вероятно, разница между тем, что я вижу, и тем, что вы видите.

Попробуйте этот фрагмент для вашего handle_data ()

def handle_data(self, data):
    try:
        string_buffer = StringIO(data)
        tweets = json.load(string_buffer)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)
    try:
        self.raw_tweets.insert(tweets)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)

Стоит отметить, что вашвставки Python не работают в «безопасном режиме» - вы должны изменить это, добавив аргументsafe=True на ваше заявление вставки. Затем вы получите исключение при любой неудачной вставке, и ваш try / catch напечатает ошибку, раскрывающую проблему.

Это также не требует больших затрат по производительности - в настоящее время я выполняю тестирование, и примерно через пять минут размеры двух коллекций составляют 14120 14113.

Так как вы вставляете только 1000 за раз, вы всегда видите кратное 1000 - кажется, что оно на самом деле не позади ...
Спасибо за указание на это! Я внес необходимые изменения (также обновил код выше): Добавлено & quot; self.tweet_list = [] & quot; после self.chunk_count = 0 и увеличил размер пакета до 1000. Он все еще кажется запаздывающим - число модулей python равно 5000, а комбо curl mongoimport равно 5718 (было 4000: 5662). Есть идеи? Sagar Hatekar
+1 за отличные комментарии! Sagar Hatekar
Да, но 4000: 5662 означает, что по-прежнему наблюдается задержка в 600 минут. право? Могут ли быть выполнены какие-либо оптимизации в этих двух местах - self.string_buffer = cStringIO.StringIO (data) для строки в self.string_buffer :? Sagar Hatekar
Кстати, я попробовал ваш код - с исправлением Python вставляет данные примерно в два раза быстрее, чем mongoimport. Это потому, что по умолчанию "безопасно" вставки выключены. Включив безопасную запись (передавая safe = True для вставки) вставки Python по-прежнему составляли около 75% времени моноимпорта.

Похожие вопросы