Vertica is a modern analytical database that’s designed for querying and working with large datasets in an analytical capacity. In addition to its power, it has a very easy learning curve: it’s query frontend is modeled off of PostgreSQL’s dialect.
When working with Vertica, I often find myself needing to export full tables from a PostgreSQL database, and load the data into a Vertica cluster. I’ve had to do this a couple of times now, so here are some tips on how to make this process painless and smooth:
Before exporting an entire table from postgres, export a subset of it to make sure it’ll work
\copy (select * from profiles limit 10000) to '/mnt/tmp/profiles.tsv';
If there are any errors in your pipeline, you’ll most likely encounter them in the first 100 thousand rows. Then test loading the data to make sure thre are no errors. This works every time and it’s a lot faster to test loading a small subset of rows first.
REJECTFILE will show you which rows were not inserted.
If you’re handling over a few million rows, useĀ COPY DIRECT
. That way, the data essentially goes straight to disk.
First, split up your file like this:
$ mkdir -p /tmp/parts/ $ split -C 500m largefile.csv /tmp/parts/largefile.csv-
Once your file is split up, you can load it to vertica using multiple processor threads:
import os import Queue import threading
from vertica_python import connect
conn_dict = {
'host':'127.0.0.1',
'port': 5433,
'user': '<< username >>',
'password': '<< password >>',
'database': '<< dbname >>'
}
class ThreadLoader(threading.Thread):
def init(self, queue):
threading.Thread.init(self)
self.queue = queue
def run(self):
conn = connect(conn_dict)
cursor = conn.cursor()
while True:
filename = self.queue.get()
print "Loading %s..." % filename
cursor.execute("COPY << table_name >> FROM '%s' DELIMITER ',' DIRECT" % filename)
print cursor.fetchall()
self.queue.task_done()
if name == '__main__':
# Load into queue.
dirname = '/tmp/parts/' # Change this to the directory where the split files reside.
queue = Queue.Queue()
# You can increase or decrease the number of inserter threads by
# increasing or decreasing the range below.
for i in range(8):
t = ThreadLoader(queue)
t.start()
for filepath in os.listdir(dirname):
print filepath
queue.put(dirname + filepath)
queue.join()