Batch imports are an efficient way to add multiple data objects and cross-references.
Additional information
To create a bulk import job, follow these steps:
- Initialize a batch object.
- Add items to the batch object.
- Ensure that the last batch is sent (flushed).
Basic import
The following example adds objects to the MyCollection collection.
data_rows = [
{"title": f"Object {i+1}"} for i in range(5)
]
collection = client.collections.use("MyCollection")
with collection.batch.fixed_size(batch_size=200) as batch:
for data_row in data_rows:
batch.add_object(
properties=data_row,
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
Error handling
During a batch import, any failed objects or references will be stored and can be obtained through batch.failed_objects and batch.failed_references.
Additionally, a running count of failed objects and references is maintained and can be accessed through batch.number_errors within the context manager.
This counter can be used to stop the import process in order to investigate the failed objects or references.
Find out more about error handling on the Python client reference page.
let dataObjects = [
{ title: 'Object 1' },
{ title: 'Object 2' },
{ title: 'Object 3' }
]
const myCollection = client.collections.use('MyCollection')
const response = await myCollection.data.insertMany(dataObjects);
console.log(response);
className := "MyCollection"
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i),
})
}
batcher := client.Batch().ObjectsBatcher()
for _, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
})
}
batcher.Do(ctx)
List<Map<String, Object>> dataRows = new ArrayList<>();
for (int i = 0; i < 5; i++) {
dataRows.add(Map.of("title", "Object " + (i + 1)));
}
var collection = client.collections.use("MyCollection");
var response = collection.data.insertMany(dataRows.toArray(new Map[0]));
if (!response.errors().isEmpty()) {
System.err.println("Number of failed imports: " + response.errors().size());
System.err.println("First failed object: " + response.errors().get(0));
}
String className = "MyCollection";
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i));
dataObjs.add(properties);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (Map<String, Object> properties : dataObjs) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(properties)
.build()
);
}
batcher.run();
var dataRows = Enumerable.Range(0, 5).Select(i => new { title = $"Object {i + 1}" }).ToList();
var collection = client.Collections.Use<object>("MyCollection");
var response = await collection.Data.InsertMany(dataRows);
var failedObjects = response.Where(r => r.Error != null).ToList();
if (failedObjects.Any())
{
Console.WriteLine($"Number of failed imports: {failedObjects.Count}");
Console.WriteLine($"First failed object: {failedObjects.First().Error}");
}
Server-side batching
Server-side batching was added in v1.34 as a preview.
This means that the feature is still under development and may change in future releases, including potential breaking changes.
We do not recommend using this feature in production environments at this time.
Here's how to import objects into a collection named MyCollection using server-side batch imports. The client will send data in batch sizes using feedback from the server.
data_rows = [
{"title": f"Object {i+1}"} for i in range(5)
]
collection = client.collections.get("MyCollection")
with collection.batch.experimental() as batch:
for data_row in data_rows:
batch.add_object(
properties=data_row,
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
Use the gRPC API
The gRPC API is faster than the REST API. Use the gRPC API to improve import speeds.
The Python client uses gRPC by default.
The legacy Python client does not support gRPC.
The TypeScript client v3 uses gRPC by default.
The legacy TypeScript client does not support gRPC.
The Java client v6 uses gRPC by default.
To use the gRPC API with the Java client, add the setGRPCHost field to your client connection code. Update setGRPCSecured if you use an encrypted connection.
Config config = new Config("http", "localhost:8080");
config.setGRPCSecured(false);
config.setGRPCHost("localhost:50051");
To use the gRPC API with the Go client, add the GrpcConfig field to your client connection code. Update Secured if you use an encrypted connection.
cfg := weaviate.Config{
Host: fmt.Sprintf("localhost:%v", "8080"),
Scheme: "http",
GrpcConfig: &grpc.Config{
Host: "localhost:50051",
Secured: false,
},
}
client, err := weaviate.NewClient(cfg)
if err != nil {
require.Nil(t, err)
}
The C# uses gRPC by default.
To use the gRPC API with the Spark connector, add the grpc:host field to your client connection code. Update grpc:secured if you use an encrypted connection.
df.write
.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", "localhost:8080")
.option("grpc:host", "localhost:50051")
.option("grpc:secured", "false")
.option("className", className)
.mode("append")
.save()
Specify an ID value
Weaviate generates an UUID for each object. Object IDs must be unique. If you set object IDs, use one of these deterministic UUID methods to prevent duplicate IDs:
generate_uuid5 (Python)
generateUuid5 (TypeScript)
from weaviate.util import generate_uuid5
data_rows = [{"title": f"Object {i+1}"} for i in range(5)]
collection = client.collections.use("MyCollection")
with collection.batch.fixed_size(batch_size=200) as batch:
for data_row in data_rows:
obj_uuid = generate_uuid5(data_row)
batch.add_object(
properties=data_row,
uuid=obj_uuid
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
import { generateUuid5 } from 'weaviate-client';
let dataObjects = [
{
properties: { title: 'Object 1' },
id: generateUuid5('MyCollection', 'Object 1'),
},
{
properties: { title: 'Object 2' },
id: generateUuid5('MyCollection', 'Object 2'),
},
]
const myCollection = client.collections.use('MyCollection')
await myCollection.data.insertMany(dataObject)
generateUUID := func(input string) strfmt.UUID {
input = strings.ToLower(input)
hash := md5.Sum([]byte(input))
uuid := fmt.Sprintf("%x-%x-%x-%x-%x", hash[0:4], hash[4:6], hash[6:8], hash[8:10], hash[10:])
return strfmt.UUID(uuid)
}
className := "MyCollection"
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i),
})
}
batcher := client.Batch().ObjectsBatcher()
for _, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
ID: generateUUID((dataObj.(map[string]interface{}))["title"].(string)),
})
}
batcher.Do(ctx)
List<WeaviateObject<Map<String, Object>, Reference, ObjectMetadata>> dataObjects =
new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> dataRow = Map.of("title", "Object " + (i + 1));
UUID objUuid = generateUuid5(dataRow.toString());
dataObjects.add(WeaviateObject.of(
obj -> obj.properties(dataRow).metadata(ObjectMetadata.of(meta -> meta.uuid(objUuid)))));
}
var collection = client.collections.use("MyCollection");
var response = collection.data.insertMany(dataObjects);
if (!response.errors().isEmpty()) {
System.err.println("Number of failed imports: " + response.errors().size());
System.err.println("First failed object: " + response.errors().get(0));
}
String className = "MyCollection";
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i));
dataObjs.add(properties);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (Map<String, Object> properties : dataObjs) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(properties)
.id(UUID.nameUUIDFromBytes(((String) properties.get("title")).getBytes()).toString())
.build()
);
}
batcher.run();
var dataToInsert = new List<(object properties, Guid uuid)>();
for (int i = 0; i < 5; i++)
{
var dataRow = new { title = $"Object {i + 1}" };
var objUuid = GenerateUuid5(JsonSerializer.Serialize(dataRow));
dataToInsert.Add((dataRow, objUuid));
}
var collection = client.Collections.Use<object>("MyCollection");
var response = await collection.Data.InsertMany(dataToInsert);
var failedObjects = response.Where(r => r.Error != null).ToList();
if (failedObjects.Any())
{
Console.WriteLine($"Number of failed imports: {failedObjects.Count}");
Console.WriteLine($"First failed object: {failedObjects.First().Error}");
}
Specify a vector
Use the vector property to specify a vector for each object.
data_rows = [{"title": f"Object {i+1}"} for i in range(5)]
vectors = [[0.1] * 1536 for i in range(5)]
collection = client.collections.use("MyCollection")
with collection.batch.fixed_size(batch_size=200) as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector=vectors[i]
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
const myCollection = client.collections.use('MyCollection')
let dataObjects = [
{
properties: { title: 'Object 1' },
vectors: Array(100).fill(0.1111),
},
{
properties: { title: 'Object 2' },
vectors: Array(100).fill(0.2222),
},
]
await jeopardy.data.insertMany(dataObjects)
className := "MyCollection"
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i),
})
}
vectors := [][]float32{}
for i := 0; i < 5; i++ {
vector := make([]float32, 10)
for j := range vector {
vector[j] = 0.25 + float32(j/100)
}
vectors = append(vectors, vector)
}
batcher := client.Batch().ObjectsBatcher()
for i, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
Vector: vectors[i],
})
}
batcher.Do(ctx)
List<WeaviateObject<Map<String, Object>, Reference, ObjectMetadata>> dataObjects =
new ArrayList<>();
float[] vector = new float[10];
Arrays.fill(vector, 0.1f);
for (int i = 0; i < 5; i++) {
Map<String, Object> dataRow = Map.of("title", "Object " + (i + 1));
UUID objUuid = generateUuid5(dataRow.toString());
dataObjects.add(WeaviateObject.of(obj -> obj.properties(dataRow)
.metadata(ObjectMetadata.of(meta -> meta.uuid(objUuid).vectors(Vectors.of(vector))))));
}
var collection = client.collections.use("MyCollection");
var response = collection.data.insertMany(dataObjects);
if (!response.errors().isEmpty()) {
System.err.println("Number of failed imports: " + response.errors().size());
System.err.println("First failed object: " + response.errors().get(0));
}
String className = "MyCollection";
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i));
dataObjs.add(properties);
}
List<Float[]> vectors = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Float[] vector = new Float[10];
Arrays.fill(vector, 0.25f + i / 100f);
vectors.add(vector);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (int i = 0; i < 5; i++) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(dataObjs.get(i))
.vector(vectors.get(i))
.build()
);
}
batcher.run();
var dataToInsert = new List<(object properties, Guid uuid, float[] vector)>();
var vector = Enumerable.Repeat(0.1f, 10).ToArray();
for (int i = 0; i < 5; i++)
{
var dataRow = new { title = $"Object {i + 1}" };
var objUuid = GenerateUuid5(JsonSerializer.Serialize(dataRow));
dataToInsert.Add((dataRow, objUuid, vector));
}
var collection = client.Collections.Use<object>("MyCollection");
var response = await collection.Data.InsertMany(dataToInsert);
var failedObjects = response.Where(r => r.Error != null).ToList();
if (failedObjects.Any())
{
Console.WriteLine($"Number of failed imports: {failedObjects.Count}");
Console.WriteLine($"First failed object: {failedObjects.First().Error}");
}
Specify named vectors
When you create an object, you can specify named vectors (if configured in your collection).
data_rows = [{
"title": f"Object {i+1}",
"body": f"Body {i+1}"
} for i in range(5)]
title_vectors = [[0.12] * 1536 for _ in range(5)]
body_vectors = [[0.34] * 1536 for _ in range(5)]
collection = client.collections.use("MyCollection")
with collection.batch.fixed_size(batch_size=200) as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector={
"title": title_vectors[i],
"body": body_vectors[i],
}
)
if batch.number_errors > 10:
print("Batch import stopped due to excessive errors.")
break
failed_objects = collection.batch.failed_objects
if failed_objects:
print(f"Number of failed imports: {len(failed_objects)}")
print(f"First failed object: {failed_objects[0]}")
const myCollection = client.collections.use("MyCollection")
let dataObjects = [
{
properties: { title: 'Object 1' },
vectors: {
title: Array(100).fill(0.1111),
body: Array(100).fill(0.9999),
}
},
{
properties: { title: 'Object 2' },
vectors: {
title: Array(100).fill(0.2222),
body: Array(100).fill(0.8888),
}
},
]
await myCollection.data.insertMany(dataObjects)
}
List<Map<String, Object>> dataRows = new ArrayList<>();
List<float[]> titleVectors = new ArrayList<>();
List<float[]> bodyVectors = new ArrayList<>();
for (int i = 0; i < 5; i++) {
dataRows.add(Map.of("title", "Object " + (i + 1), "body", "Body " + (i + 1)));
float[] titleVector = new float[1536];
Arrays.fill(titleVector, 0.12f);
titleVectors.add(titleVector);
float[] bodyVector = new float[1536];
Arrays.fill(bodyVector, 0.34f);
bodyVectors.add(bodyVector);
}
CollectionHandle<Map<String, Object>> collection = client.collections.use("MyCollection");
List<WeaviateObject<Map<String, Object>, Reference, ObjectMetadata>> objectsToInsert =
new ArrayList<>();
for (int i = 0; i < dataRows.size(); i++) {
int index = i;
objectsToInsert.add(
new WeaviateObject.Builder<Map<String, Object>, Reference, ObjectMetadata>()
.properties(dataRows.get(index))
.metadata(ObjectMetadata
.of(meta -> meta.vectors(Vectors.of("title", titleVectors.get(index)),
Vectors.of("body", bodyVectors.get(index)))))
.build());
}
InsertManyResponse response = collection.data.insertMany(objectsToInsert);
if (!response.errors().isEmpty()) {
System.err.printf("Number of failed imports: %d\n", response.errors().size());
System.err.printf("First failed object error: %s\n", response.errors().get(0));
}
var dataToInsert = new List<(object properties, Dictionary<string, float[]> vectors)>();
for (int i = 0; i < 5; i++)
{
var dataRow = new { title = $"Object {i + 1}", body = $"Body {i + 1}" };
var titleVector = Enumerable.Repeat(0.12f, 1536).ToArray();
var bodyVector = Enumerable.Repeat(0.34f, 1536).ToArray();
var namedVectors = new Dictionary<string, float[]>
{
{ "title", titleVector },
{ "body", bodyVector }
};
dataToInsert.Add((dataRow, namedVectors));
}
var collection = client.Collections.Use<object>("MyCollection");
var response = await collection.Data.InsertMany(dataToInsert);
var failedObjects = response.Where(r => r.Error != null).ToList();
if (failedObjects.Any())
{
Console.WriteLine($"Number of failed imports: {failedObjects.Count}");
Console.WriteLine($"First failed object error: {failedObjects.First().Error}");
}
Import with references
You can batch create links from an object to another other object through cross-references.
collection = client.collections.use("Author")
with collection.batch.fixed_size(batch_size=100) as batch:
batch.add_reference(
from_property="writesFor",
from_uuid=from_uuid,
to=target_uuid,
)
failed_references = collection.batch.failed_references
if failed_references:
print(f"Number of failed imports: {len(failed_references)}")
print(f"First failed reference: {failed_references[0]}")
var collection = client.collections.use("Author");
var response =
collection.data.referenceAddMany(BatchReference.uuids(from, "writesFor", targetUuid));
if (!response.errors().isEmpty()) {
System.err.println("Number of failed imports: " + response.errors().size());
System.err.println("First failed object: " + response.errors().get(0));
}
var collection = client.Collections.Use<object>("Author");
var response = await collection.Data.ReferenceAddMany(new DataReference(fromUuid, "writesFor", targetUuid));
if (response.HasErrors)
{
Console.WriteLine($"Number of failed imports: {response.Errors.Count}");
Console.WriteLine($"First failed object: {response.Errors.First()}");
}
Python-specific considerations
The Python clients have built-in batching methods to help you optimize import speed. For details, see the client documentation:
Async Python client and batching
Currently, the async Python client does not support batching. To use batching, use the sync Python client.
Stream data from large files
If your dataset is large, consider streaming the import to avoid out-of-memory issues.
To try the example code, download the sample data and create the sample input files.
Get the sample data
import requests
response = requests.get(
"https://raw.githubusercontent.com/weaviate-tutorials/intro-workshop/main/data/jeopardy_1k.json"
)
data = response.json()
with open('jeopardy_1k.json', 'w') as f:
json.dump(data, f)
import fs from 'fs';
// Uncomment these imports to save as csv
// import { mkConfig, generateCsv, asString } from "export-to-csv";
// import { writeFile } from "node:fs";
// import { Buffer } from "node:buffer";
// Get the data file
async function getJsonData() {
const file = await fetch(
'https://raw.githubusercontent.com/weaviate-tutorials/intro-workshop/main/data/jeopardy_1k.json'
);
const data = await file.json()
// Save as json
fs.writeFile("jeopardy_1k.json", JSON.stringify(data), function(err) {
if (err) {
console.log(err);
}
});
// // Uncomment this section to save as csv
// const csvConfig = mkConfig({ useKeysAsHeaders: true, filename: "jeopardy_1k" });
// const csv = generateCsv(csvConfig)(data);
// const filename = `${csvConfig.filename}.csv`;
// const csvBuffer = new Uint8Array(Buffer.from(asString(csv)));
// // Write the csv file to disk
// writeFile(filename, csvBuffer, (err) => {
// if (err) throw err;
// console.log("file saved: ", filename);
// });
}
await getJsonData();
Stream JSON files example code
import ijson
counter = 0
interval = 200
print("JSON streaming, to avoid running out of memory on large files...")
with client.batch.fixed_size(batch_size=100) as batch:
with open("jeopardy_1k.json", "rb") as f:
objects = ijson.items(f, "item")
for obj in objects:
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
)
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print(f"Finished importing {counter} articles.")
import weaviate from 'weaviate-client';
import fs from 'fs';
import parser from 'stream-json';
import StreamArray from 'stream-json/streamers/StreamArray';
import Chain from 'stream-chain';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
});
counter++;
if (counter % batchSize === 0) {
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
for (const r of response)
if (r.result.errors)
throw r.result.errors;
console.log(`Imported ${counter} articles...`);
}
}
async function importJson(filePath) {
const pipeline = new Chain([
fs.createReadStream(filePath),
parser(),
new StreamArray(),
]);
for await (const { value } of pipeline) {
await addObject(value);
}
}
await importJson('jeopardy_1k.json');
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
int batchSize = 100;
List<Map<String, Object>> batch = new ArrayList<>(batchSize);
var collection = client.collections.use("JeopardyQuestion");
Gson gson = new Gson();
System.out.println("JSON streaming, to avoid running out of memory on large files...");
try (JsonReader reader = new JsonReader(new FileReader("jeopardy_1k.json"))) {
reader.beginArray();
while (reader.hasNext()) {
Map<String, String> obj = gson.fromJson(reader, Map.class);
Map<String, Object> properties = new HashMap<>();
properties.put("question", obj.get("Question"));
properties.put("answer", obj.get("Answer"));
batch.add(properties);
if (batch.size() == batchSize) {
collection.data.insertMany(batch.toArray(new Map[0]));
System.out.println("Imported " + batch.size() + " articles...");
batch.clear();
}
}
reader.endArray();
}
if (!batch.isEmpty()) {
collection.data.insertMany(batch.toArray(new Map[0]));
System.out.println("Imported remaining " + batch.size() + " articles...");
}
System.out.println("Finished importing articles.");
int batchSize = 100;
var batch = new List<object>(batchSize);
var collection = client.Collections.Use<object>("JeopardyQuestion");
Console.WriteLine("JSON streaming, to avoid running out of memory on large files...");
using var fileStream = File.OpenRead(JsonDataFile);
var jsonObjects = JsonSerializer.DeserializeAsyncEnumerable<Dictionary<string, object>>(fileStream);
await foreach (var obj in jsonObjects)
{
if (obj == null) continue;
var properties = new { question = obj["Question"], answer = obj["Answer"] };
batch.Add(properties);
if (batch.Count == batchSize)
{
await collection.Data.InsertMany(batch);
Console.WriteLine($"Imported {batch.Count} articles...");
batch.Clear();
}
}
if (batch.Any())
{
await collection.Data.InsertMany(batch);
Console.WriteLine($"Imported remaining {batch.Count} articles...");
}
Console.WriteLine("Finished importing articles.");
Stream CSV files example code
import pandas as pd
counter = 0
interval = 200
def add_object(obj) -> None:
global counter
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
with client.batch.fixed_size(batch_size=100) as batch:
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
)
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print("pandas dataframe iterator with lazy-loading, to not load all records in RAM at once...")
with client.batch.fixed_size(batch_size=200) as batch:
with pd.read_csv(
"jeopardy_1k.csv",
usecols=["Question", "Answer", "Category"],
chunksize=100,
) as csv_iterator:
for chunk in csv_iterator:
for index, row in chunk.iterrows():
properties = {
"question": row["Question"],
"answer": row["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
)
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print(f"Finished importing {counter} articles.")
import weaviate from 'weaviate-client';
import fs from 'fs';
import csv from 'csv-parser';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
});
counter++;
if (counter % batchSize === 0) {
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
for (const r of response)
if (r.result.errors)
throw r.result.errors;
console.log(`Imported ${counter} articles...`);
}
}
async function importCSV(filePath) {
const stream = fs.createReadStream(filePath).pipe(csv());
for await (const row of stream) {
await addObject(row);
}
}
await importCSV('jeopardy_1k.csv');
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
int batchSize = 100;
List<Map<String, Object>> batch = new ArrayList<>(batchSize);
var collection = client.collections.use("JeopardyQuestion");
System.out.println("CSV streaming to not load all records in RAM at once...");
try (BufferedReader csvReader = new BufferedReader(new FileReader("jeopardy_1k.csv"))) {
String line = csvReader.readLine();
while ((line = csvReader.readLine()) != null) {
String[] data = line.split("\",\"");
Map<String, Object> properties = new HashMap<>();
properties.put("question", data[0].substring(1));
properties.put("answer", data[1].substring(0, data[1].length() - 1));
batch.add(properties);
if (batch.size() == batchSize) {
collection.data.insertMany(batch.toArray(new Map[0]));
System.out.println("Imported " + batch.size() + " articles...");
batch.clear();
}
}
}
if (!batch.isEmpty()) {
collection.data.insertMany(batch.toArray(new Map[0]));
System.out.println("Imported remaining " + batch.size() + " articles...");
}
System.out.println("Finished importing articles.");
int batchSize = 100;
var batch = new List<object>(batchSize);
var collection = client.Collections.Use<object>("JeopardyQuestion");
Console.WriteLine("CSV streaming to not load all records in RAM at once...");
using (var reader = new StreamReader(CsvDataFile))
using (var csv = new CsvReader(reader, CultureInfo.InvariantCulture))
{
var records = csv.GetRecords<JeopardyQuestion>();
foreach (var record in records)
{
var properties = new { question = record.Question, answer = record.Answer };
batch.Add(properties);
if (batch.Count == batchSize)
{
await collection.Data.InsertMany(batch);
Console.WriteLine($"Imported {batch.Count} articles...");
batch.Clear();
}
}
}
if (batch.Any())
{
await collection.Data.InsertMany(batch);
Console.WriteLine($"Imported remaining {batch.Count} articles...");
}
Console.WriteLine("Finished importing articles.");
Batch vectorization
Some model providers provide batch vectorization APIs, where each request can include multiple objects.
From Weaviate v1.25.0, a batch import automatically makes use of the model providers' batch vectorization APIs where available. This reduces the number of requests to the model provider, improving throughput.
Model provider configurations
You can configure the batch vectorization settings for each model provider, such as the requests per minute or tokens per minute. The following examples sets rate limits for Cohere and OpenAI integrations, and provides API keys for both.
Note that each provider exposes different configuration options.
from weaviate.classes.config import Integrations
integrations = [
Integrations.cohere(
api_key=cohere_key,
requests_per_minute_embeddings=rpm_embeddings,
),
Integrations.openai(
api_key=openai_key,
requests_per_minute_embeddings=rpm_embeddings,
tokens_per_minute_embeddings=tpm_embeddings,
),
]
client.integrations.configure(integrations)
Additional considerations
Data imports can be resource intensive. Consider the following when you import large amounts of data.
Asynchronous imports
Available starting in v1.22. This is an experimental feature. Use with caution.
To maximize import speed, enable asynchronous indexing.
To enable asynchronous indexing, set the ASYNC_INDEXING environment variable to true in your Weaviate configuration file.
weaviate:
image: cr.weaviate.io/semitechnologies/weaviate:1.34.0
...
environment:
ASYNC_INDEXING: 'true'
...
Automatically add new tenants
By default, Weaviate returns an error if you try to insert an object into a non-existent tenant. To change this behavior so Weaviate creates a new tenant, set autoTenantCreation to true in the collection definition.
The auto-tenant feature is available from v1.25.0 for batch imports, and from v1.25.2 for single object insertions as well.
Set autoTenantCreation when you create the collection, or reconfigure the collection to update the setting as needed.
Automatic tenant creation is useful when you import a large number of objects. Be cautious if your data is likely to have small inconsistencies or typos. For example, the names TenantOne, tenantOne, and TenntOne will create three different tenants.
For details, see auto-tenant.
Further resources
Questions and feedback
If you have any questions or feedback, let us know in the user forum.