Use

To use pgcopy, instantiate a copy manager for your table, then pass it some data:

from datetime import datetime
from pgcopy import CopyManager
import psycopg2
cols = ('id', 'timestamp', 'location', 'temperature')
now = datetime.now()
records = [
        (0, now, 'Jerusalem', 72.2),
        (1, now, 'New York', 75.6),
        (2, now, 'Moscow', 54.3),
    ]
conn = psycopg2.connect(database='weather_db')
mgr = CopyManager(conn, 'measurements_table', cols)
mgr.copy(records)

# don't forget to commit!
conn.commit()
class pgcopy.CopyManager(conn, table, cols)

Facility for bulk-loading data using binary copy.

Inspects the database on instantiation for the column types.

Parameters
  • conn (psycopg2 connection) – a database connection

  • table (str) – the table name. Schema may be specified using dot notation: schema.table.

  • cols (list of str) – columns in the table into which to copy data

Raises

ValueError – if the table or columns do not exist.

copy(data[, fobject_factory])

Copy data into the database using a temporary file.

Parameters
  • data (iterable of iterables) – the data to be inserted

  • fobject_factory (function) – a tempfile factory

Data is serialized first in its entirety and then sent to the database. By default, a temporary file on disk is used. If you have enough memory, you can get a slight performance benefit with in-memory storage:

from io import BytesIO
mgr.copy(records, BytesIO)

For very large datasets, serialization can be done directly to the database connection using threading_copy().

In most circumstances, however, data transfer over the network and db processing take significantly more time than writing and reading a temporary file on a local disk.

ValueError is raised if a null value is provided for a column with non-null constraint.

threading_copy(data)

Copy data, serializing directly to the database.

Parameters

data (iterable of iterables) – the data to be inserted