Em sua docstring, elasticsearch.helpers.async_bulk
descreve-se como um
Auxiliar para o :meth:
~elasticsearch.AsyncElasticsearch.bulk
api que fornece mais humana interface amigável - ele consome um iterador de ações e envia-los para o elasticsearch em pedaços. origem
Contexto
Tenho vindo a utilizar AsyncElasticsearch.bulk()
com êxito para enviar pandas dataframes para alguns ES instância
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
Problema
No entanto, quando se trata de async_bulk
, Eu estou chegando index is missing
erros.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Tentei sintonizar _rec_to_actions()
de várias maneiras, sem muito efeito.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
Eu acho que o principal problema é que eu não estou certo de saber o que é uma ação, no contexto do elasticsearch. Esta noção está em toda parte na documentação, mas não tem uma clara estrutura de dados em contrapartida esta biblioteca de código-fonte (nenhum que eu poderia encontrar, de qualquer forma)
O que é exatamente uma ação e como devo afinar meu gerador para enviar df dados do self.index
?
meio ambiente
- python = "3.9.5"
- elasticsearch = "7.14.1"