forked from haxdai/ckanops
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathckantests.py
67 lines (47 loc) · 1.46 KB
/
ckantests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import ckanapi
from ckanapi.errors import CKANAPIError
import ckanops
import webtest
import converters
import urllib2
import json
import random
import munge
host = os.environ['CKAN_HOST']
token = os.environ['CKAN_API_TOKEN']
test_app = webtest.TestApp(host)
demo = ckanapi.TestAppCKAN(test_app, apikey=token)
def create_should_succeed():
try:
pkg = demo.action.package_create(name='my-dataset', title='not going to work')
except ckanapi.NotAuthorized:
print 'denied'
def list_datasets():
return demo.action.package_list()
def list_groups():
return demo.action.group_list(id='data-explorer')
def package_to_dcat(package):
return converters.ckan_to_dcat(package)
# pkg = ckanops.get_package('ciclones')
# print pkg
# print package_to_dcat(pkg)
# print 'Datasets'
# print list_datasets()
# print 'Groups'
# print list_groups()
catalog = ckanops.dcat_to_utf8_dict("http://adela.datos.gob.mx/sedesol/catalogo.json")
print catalog.get('title')
for dataset in catalog.get('dataset', []):
d = converters.dcat_to_ckan(dataset)
d['name'] = munge.munge_title_to_name(d['title'])
print "Creating dataset '%s'" % d['title'], "with %d resources" % len(d['resources'])
print "Name: %s" % d['name']
print "Org: %s" % d['owner_org']
new_dataset = ckanops.upsert_dataset(demo, d)
if new_dataset:
print 'Dataset upserted'
else:
print 'Something went wrong'