ในของมัน docstring, elasticsearch.helpers.async_bulk
อธิบายว่าตัวเองเป็น
ช่วยทำงานสำหรับ:ยา:
~elasticsearch.AsyncElasticsearch.bulk
รูปแบบ api งนั้นให้ มากกว่ามนุษย์ส่วนติดต่อเพื่อน-มันพร้อมจะกลืนกิน iterator ของการกระทำและ ส่งพวกเขาต้อง elasticsearch อยู่ในนวนท่อนที่. แหล่งข่าว
คอนเท็กซ์
ฉันต้องใช้ AsyncElasticsearch.bulk()
เรียบร้อยแล้จะส่ง pandas dataframes บางอย่า ES ตัวอย่าง
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
ปัญหา
อย่างไรก็ตามเมื่อมันมา async_bulk
ฉันกำลังจะ index is missing
เกิดข้อผิดพลาด.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
พยายามทำร้ายร่างกาย _rec_to_actions()
ในหลายวิธีที่ไม่มีกลูกเล่น.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
ฉันเดาว่าคงเป็นหลักปัญหาคือผมไม่ค่อยมั่นใจที่จะรู้ว่า อะไรเป็นการกระทำในคอนเท็กซ์ของ elasticsearch. นี่ซึ้งอยู่ทุกที่อยู่ในเอกสารคู่มือแต่ยังไม่มีเคลียร์ ข้อมูลของโครงสร้าง ที่เป็นคู่มือในห้องสมุด ด้วยรหัสต้นทางของ (ไม่ใช่เรื่องนั้นฉันอาจจะหาเจอยังไงก็ตา)
มันคืออะไรแน่นอน การกระทำ และฉันควรจะท่วงทำนองของฉันเครื่องมือสร้างส่ง df นข้อมูลให้ self.index
?
สภาพแวดล้อม
- ปลั๊กอินสำหรับไพธอน="3.9.5"
- elasticsearch="7.14.1"