diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8731f78 --- /dev/null +++ b/.gitignore @@ -0,0 +1,54 @@ + +*.class +.*.swp +.beamer +# Package Files # +*.jar +*.war +*.ear +*.versionsBackup + +# Intellij Files & Dir # +*.iml +*.ipr +*.iws +atlassian-ide-plugin.xml +out/ +.DS_Store +./lib/ +.idea + +# Gradle Files & Dir # +build/ +.gradle/ +.stickyStorage +.build/ +target/ + +# Node log +npm-*.log +logs/ + +# Singlenode and test data files. +/templates/ +/data/ +/data-fabric-tests/data/ + +# ANTLR4 +/core/gen +*.tokens +DirectivesLexer.java +DirectivesParser.java +DirectivesBaseListener.java +DirectivesBaseVisitor.java +DirectivesListener.java +DirectivesVisitor.java + +# generated by docs build +*.py + +# Remove release.properties +release.properties + +# Remove dev directory. +dev \ No newline at end of file diff --git a/checkstyle.xml b/checkstyle.xml new file mode 100644 index 0000000..04c6eda --- /dev/null +++ b/checkstyle.xml @@ -0,0 +1,397 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/MarketoReportingPlugin-batchsource.md b/docs/MarketoReportingPlugin-batchsource.md new file mode 100644 index 0000000..5451214 --- /dev/null +++ b/docs/MarketoReportingPlugin-batchsource.md @@ -0,0 +1,26 @@ +# Marketo Reporting Batch Source + +Description +----------- +This plugin is used to query Leads or Activities entities for specified date range from Marketo. + +Properties +---------- +### General + +**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc. + +**Rest API endpoint:** Marketo rest API endpoint, unique for each client. +### Authentication + +**Client ID:** Client ID. + +**Client Secret:** Client secret. + +### Report + +**Report Type:** Type of report. One of 'leads' or 'activities'. + +**Start Date:** Start date of report. In ISO 8601 format(1997-07-16T19:20:30+01:00). + +**End Date:** End date of report. In ISO 8601 format(1997-07-16T19:20:30+01:00). \ No newline at end of file diff --git a/icons/MarketoReportingPlugin-batchsource.png b/icons/MarketoReportingPlugin-batchsource.png new file mode 100644 index 0000000..4abe7e7 Binary files /dev/null and b/icons/MarketoReportingPlugin-batchsource.png differ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f7a867b --- /dev/null +++ b/pom.xml @@ -0,0 +1,439 @@ + + + + 4.0.0 + + io.cdap.plugin + marketo-entity-plugin + 1.0.0-SNAPSHOT + + + + sonatype + https://oss.sonatype.org/content/groups/public + + + sonatype-snapshots + https://oss.sonatype.org/content/repositories/snapshots + + + + + 6.1.0-SNAPSHOT + 2.8.0 + 4.5.9 + 2.3.0-SNAPSHOT + 2.1.3 + + + + + io.cdap.cdap + cdap-api + ${cdap.version} + provided + + + io.cdap.cdap + cdap-etl-api + ${cdap.version} + provided + + + io.cdap.cdap + cdap-formats + ${cdap.version} + + + org.apache.avro + avro + + + io.thekraken + grok + + + + + io.cdap.plugin + hydrator-common + ${hydrator.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + provided + + + commons-logging + commons-logging + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.avro + avro + + + org.apache.zookeeper + zookeeper + + + com.google.guava + guava + + + jersey-core + com.sun.jersey + + + jersey-json + com.sun.jersey + + + jersey-server + com.sun.jersey + + + servlet-api + javax.servlet + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + jasper-compiler + tomcat + + + jasper-runtime + tomcat + + + jsp-api + javax.servlet.jsp + + + slf4j-api + org.slf4j + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + provided + + + org.slf4j + slf4j-log4j12 + + + com.google.inject.extensions + guice-servlet + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-server + + + com.sun.jersey + jersey-json + + + com.sun.jersey.contribs + jersey-guice + + + javax.servlet + servlet-api + + + com.google.guava + guava + + + + + + io.cdap.cdap + cdap-etl-api-spark + ${cdap.version} + provided + + + org.apache.spark + spark-streaming_2.11 + ${spark2.version} + provided + + + org.apache.spark + spark-core_2.11 + ${spark2.version} + provided + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + org.apache.hadoop + hadoop-client + + + com.esotericsoftware.reflectasm + reflectasm + + + org.apache.curator + curator-recipes + + + + org.scala-lang + scala-compiler + + + org.scala-lang + scala-reflect + + + org.eclipse.jetty.orbit + javax.servlet + + + + net.java.dev.jets3t + jets3t + + + asm + asm + + + + + + org.apache.httpcomponents + httpclient + ${httpcomponents.version} + + + com.google.code.gson + gson + 2.8.6 + + + org.apache.commons + commons-csv + 1.7 + + + commons-io + commons-io + 2.6 + + + org.awaitility + awaitility + 4.0.1 + + + + + + + + junit + junit + 4.12 + test + + + com.github.tomakehurst + wiremock-jre8-standalone + 2.25.1 + test + + + + + + + org.apache.rat + apache-rat-plugin + 0.13 + + + org.apache.maven.doxia + doxia-core + 1.6 + + + xerces + xercesImpl + + + + + + + rat-check + validate + + check + + + + LICENSE*.txt + + *.rst + *.md + **/*.cdap + **/*.yaml + **/*.md + logs/** + .git/** + .idea/** + **/grok/patterns/** + conf/** + data/** + plugins/** + **/*.patch + **/logrotate.d/** + **/limits.d/** + **/*.json + **/*.json.template + **/MANIFEST.MF + + **/org/apache/hadoop/** + + **/resources/** + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.17 + + + validate + process-test-classes + + checkstyle.xml + suppressions.xml + UTF-8 + true + true + true + **/org/apache/cassandra/**,**/org/apache/hadoop/** + + + check + + + + + + com.puppycrawl.tools + checkstyle + 8.18 + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + io.cdap + cdap-maven-plugin + 1.1.0 + + + system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT) + + + + + create-artifact-config + prepare-package + + create-plugin-json + + + + + + org.apache.felix + maven-bundle-plugin + 3.5.0 + true + + + <_exportcontents>io.cdap.plugin.marketo.* + *;inline=false;scope=compile + true + lib + + + + + package + + bundle + + + + + + + \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/Helpers.java b/src/main/java/io/cdap/plugin/marketo/common/api/Helpers.java new file mode 100644 index 0000000..bda2387 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/Helpers.java @@ -0,0 +1,118 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import io.cdap.plugin.marketo.common.api.entities.DateRange; +import org.apache.commons.io.IOUtils; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URIBuilder; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.List; + +/** + * Various helper methods. + */ +public class Helpers { + public static String streamToString(InputStream inputStream) { + try { + return IOUtils.toString(inputStream, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to read stream completely due to '%s'", e.getMessage())); + } + } + + public static T streamToObject(InputStream inputStream, Class cls) { + return Marketo.GSON.fromJson(new InputStreamReader(inputStream), cls); + } + + public static RuntimeException failForMethodAndUri(String method, URI uri, Exception ex) { + String message = ex.getMessage(); + if (Strings.isNullOrEmpty(message)) { + if (ex.getCause() != null) { + message = ex.getCause().getMessage(); + if (Strings.isNullOrEmpty(message)) { + message = "Unknown failure"; + } + } + } + + URIBuilder uriBuilder = new URIBuilder(uri); + List queryParameters = uriBuilder.getQueryParams(); + queryParameters.removeIf(queryParameter -> queryParameter.getName().equals("access_token")); + uriBuilder.setParameters(queryParameters); + try { + String uriString = uriBuilder.build().toString(); + return new RuntimeException(String.format("Failed '%s' '%s' - '%s'", method, uriString, message)); + } catch (URISyntaxException e) { + // this will never happen since we rebuilding already validated uri, just make compiler happy + return new RuntimeException(e); + } + } + + /** + * Splits date range in 30 day ranges. + * Return date range as is if difference is less or equals to 30 days. + * + * @param beginDate + * @param endDate + * @return + */ + public static List getDateRanges(String beginDate, String endDate) { + OffsetDateTime start = OffsetDateTime.parse(beginDate); + OffsetDateTime end = OffsetDateTime.parse(endDate); + + if (start.compareTo(end) > 0) { + throw new IllegalArgumentException("Start date cannot be greater than the end date."); + } + + int compareResult = start.plusDays(30).compareTo(end); + if (compareResult >= 0) { + // we are in range of 30 days, dates are okay + return ImmutableList.of(new DateRange(start.toString(), end.toString())); + } else { + List result = new ArrayList<>(); + OffsetDateTime currentStart = start; + + while (currentStart.compareTo(end) < 0) { + OffsetDateTime nextEnd = currentStart.plusDays(30); + result.add(new DateRange(currentStart.toString(), + min(nextEnd.minusSeconds(1), end).toString())); + currentStart = nextEnd; + } + + return result; + } + } + + public static > T min(T o1, T o2) { + if (o1.compareTo(o2) < 0) { + return o1; + } else { + return o2; + } + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/Marketo.java b/src/main/java/io/cdap/plugin/marketo/common/api/Marketo.java new file mode 100644 index 0000000..4ccd799 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/Marketo.java @@ -0,0 +1,162 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExport; +import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExportRequest; +import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExportResponse; +import io.cdap.plugin.marketo.common.api.entities.activities.ActivityTypeResponse; +import io.cdap.plugin.marketo.common.api.entities.leads.LeadsDescribeResponse; +import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExport; +import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExportRequest; +import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExportResponse; +import io.cdap.plugin.marketo.common.api.job.ActivitiesExportJob; +import io.cdap.plugin.marketo.common.api.job.LeadsExportJob; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.Collections; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * Class that expose marketo rest api endpoints. + */ +public class Marketo extends MarketoHttp { + private static final Logger LOG = LoggerFactory.getLogger(Marketo.class); + static final Gson GSON = new Gson(); + public static final List ACTIVITY_FIELDS = ImmutableList.of("marketoGUID", "leadId", "activityDate", + "activityTypeId", "campaignId", + "primaryAttributeValueId", + "primaryAttributeValue", "attributes"); + /** + * Job queue will be checked every 60 seconds. + */ + private static final long JOB_QUEUE_POLL_INTERVAL = 60; + /** + * Wait for 10 seconds before trying to enqueue job, this will minimize chance of race condition. + */ + private static final long JOB_QUEUE_POLL_DELAY = 10; + + public Marketo(String marketoEndpoint, String clientId, String clientSecret) { + super(marketoEndpoint, clientId, clientSecret); + } + + public List describeLeads() { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize( + iteratePage(Urls.LEADS_DESCRIBE, LeadsDescribeResponse.class, LeadsDescribeResponse::getResult), + Spliterator.ORDERED), false).collect(Collectors.toList()); + } + + public List describeBuildInActivities() { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize( + iteratePage(Urls.BUILD_IN_ACTIVITIES_TYPES, ActivityTypeResponse.class, ActivityTypeResponse::getResult), + Spliterator.ORDERED), false).collect(Collectors.toList()); + } + + public LeadsExportJob exportLeads(LeadsExportRequest request) { + LeadsExportResponse export = validatedPost(Urls.BULK_EXPORT_LEADS_CREATE, Collections.emptyMap(), + Marketo::streamToLeadsExport, + request, + GSON::toJson); + return new LeadsExportJob(export.singleExport(), this); + } + + public LeadsExport leadsExportJobStatus(String jobId) { + LeadsExportResponse currentResp = validatedGet( + String.format(Urls.BULK_EXPORT_LEADS_STATUS, jobId), + Collections.emptyMap(), Marketo::streamToLeadsExport); + return currentResp.singleExport(); + } + + public ActivitiesExportJob exportActivities(ActivitiesExportRequest request) { + ActivitiesExportResponse export = validatedPost(Urls.BULK_EXPORT_ACTIVITIES_CREATE, Collections.emptyMap(), + Marketo::streamToActivitiesExport, + request, + GSON::toJson); + return new ActivitiesExportJob(export.singleExport(), this); + } + + public ActivitiesExport activitiesExportJobStatus(String jobId) { + ActivitiesExportResponse currentResp = validatedGet( + String.format(Urls.BULK_EXPORT_ACTIVITIES_STATUS, jobId), + Collections.emptyMap(), Marketo::streamToActivitiesExport); + return currentResp.singleExport(); + } + + /** + * Waits until bulk extract queue has available slot and executes given action. + * + * @param action action to execute once slot is available + * @param timeoutSeconds timeout in seconds + */ + public void onBulkExtractQueueAvailable(Callable action, long timeoutSeconds) { + try { + Awaitility.given() + .ignoreException(TooManyJobsException.class) // ignore exception in case another reader took our slot + .atMost(timeoutSeconds, TimeUnit.SECONDS) + .pollInterval(JOB_QUEUE_POLL_INTERVAL, TimeUnit.SECONDS) + .pollDelay(JOB_QUEUE_POLL_DELAY, TimeUnit.SECONDS) + .until(action); + } catch (ConditionTimeoutException ex) { + throw new RuntimeException("Failed to get slot in bulk export queue due to timeout"); + } + } + + public static LeadsExportResponse streamToLeadsExport(InputStream inputStream) { + return Helpers.streamToObject(inputStream, LeadsExportResponse.class); + } + + public static ActivitiesExportResponse streamToActivitiesExport(InputStream inputStream) { + return Helpers.streamToObject(inputStream, ActivitiesExportResponse.class); + } + + /** + * Check if job can be enqueued. + * + * @return true, if job can be enqueued + */ + public boolean canEnqueueJob() { + LeadsExportResponse leadsExportResponseJobs = validatedGet(Urls.BULK_EXPORT_LEADS_LIST, + ImmutableMap.of("status", "queued,processing"), + Marketo::streamToLeadsExport + ); + + int jobsInQueue = leadsExportResponseJobs.getResult().size(); + + ActivitiesExportResponse activitiesExportResponceJobs = validatedGet(Urls.BULK_EXPORT_ACTIVITIES_LIST, + ImmutableMap.of("status", "queued,processing"), + Marketo::streamToActivitiesExport + ); + jobsInQueue += activitiesExportResponceJobs.getResult().size(); + + LOG.debug("Jobs in queue: {}", jobsInQueue); + + return jobsInQueue < 10; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/MarketoHttp.java b/src/main/java/io/cdap/plugin/marketo/common/api/MarketoHttp.java new file mode 100644 index 0000000..44c8698 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/MarketoHttp.java @@ -0,0 +1,228 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import io.cdap.plugin.marketo.common.api.entities.BaseResponse; +import io.cdap.plugin.marketo.common.api.entities.Error; +import io.cdap.plugin.marketo.common.api.entities.MarketoToken; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * Class that encapsulates common http functions for marketo rest api. + */ +class MarketoHttp { + private static final Logger LOG = LoggerFactory.getLogger(Marketo.class); + private static final Gson GSON = new Gson(); + private String marketoEndpoint; + private String clientId; + private String clientSecret; + private MarketoToken token; + private HttpClientContext httpClientContext = HttpClientContext.create(); + + MarketoHttp(String marketoEndpoint, String clientId, String clientSecret) { + this.marketoEndpoint = marketoEndpoint; + this.clientId = clientId; + this.clientSecret = clientSecret; + token = refreshToken(); + } + + private T getPage(String queryUrl, Class pageClass) { + return validatedGet(queryUrl, Collections.emptyMap(), + inputStream -> Helpers.streamToObject(inputStream, pageClass)); + } + + T getNextPage(T currentPage, String queryUrl, Class pageClass) { + if (!Strings.isNullOrEmpty(currentPage.getNextPageToken())) { + return validatedGet(queryUrl, + ImmutableMap.of("nextPageToken", currentPage.getNextPageToken()), + inputStream -> Helpers.streamToObject(inputStream, pageClass)); + } + return null; + } + + MarketoPageIterator iteratePage(String queryUrl, + Class pageClass, + Function> resultsGetter) { + return new MarketoPageIterator<>(getPage(queryUrl, pageClass), this, queryUrl, pageClass, resultsGetter); + } + + T validatedGet(String queryUrl, Map parameters, + Function deserializer) { + String logUri = "GET " + buildUri(queryUrl, parameters, false).toString(); + return retryableValidate(logUri, () -> { + URI queryUri = buildUri(queryUrl, parameters, true); + return get(queryUri, deserializer); + }); + } + + public T validatedPost(String queryUrl, Map parameters, + Function deserializer, + B body, Function qSerializer) { + String logUri = "POST " + buildUri(queryUrl, parameters, false).toString(); + return retryableValidate(logUri, () -> { + URI queryUri = buildUri(queryUrl, parameters, true); + return post(queryUri, deserializer, body, qSerializer); + }); + } + + // code: 1029, message: Too many jobs (10) in queue + private T retryableValidate(String logUri, Supplier tryQuery) { + T result = tryQuery.get(); + // check for expired token + if (!result.isSuccess()) { + for (Error error : result.getErrors()) { + if (error.getCode() == 602 && error.getMessage().equals("Access token expired")) { + // refresh token and retry + token = refreshToken(); + LOG.info("Refreshed token"); + return tryQuery.get(); + } + } + } + + // log warnings if required + if (result.getWarnings().size() > 0) { + String warnings = result.getWarnings().stream() + .map(error -> String.format("code: %s, message: %s", error.getCode(), error.getMessage())) + .collect(Collectors.joining("; ")); + LOG.warn("Warnings when calling '{}' - {}", logUri, warnings); + } + + if (!result.isSuccess()) { + String msg = String.format("Errors when calling '%s'", logUri); + // log errors if required + if (result.getErrors().size() > 0) { + String errors = result.getErrors().stream() + .map(error -> String.format("code: %s, message: %s", error.getCode(), error.getMessage())) + .collect(Collectors.joining("; ")); + msg = msg + " - " + errors; + LOG.error(msg); + } + throw mapErrorsToException(result.getErrors(), msg); + } + return result; + } + + private RuntimeException mapErrorsToException(List errors, String defaultMessage) { + if (errors.size() == 1) { + Error e = errors.get(0); + String message = e.getMessage(); + if (e.getCode() == 1029 && message != null && message.contains("many jobs")) { + return new TooManyJobsException(); + } else { + // this error don't require specific handling + return new RuntimeException(defaultMessage); + } + } else { + // something outstanding happened and we have more than one error, we can't handle this in specific way + return new RuntimeException(defaultMessage); + } + } + + public T get(URI uri, Function deserializer) { + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpGet request = new HttpGet(uri); + try (CloseableHttpResponse response = httpClient.execute(request, httpClientContext)) { + checkResponseCode(response); + return deserializer.apply(response.getEntity().getContent()); + } + } catch (Exception e) { + throw Helpers.failForMethodAndUri("GET", uri, e); + } + } + + private T post(URI uri, Function respDeserializer, B body, Function qSerializer) { + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpPost request = new HttpPost(uri); + if (body != null) { + Objects.requireNonNull(qSerializer, "body serializer must be specified with body"); + request.setEntity(new StringEntity(qSerializer.apply(body), ContentType.APPLICATION_JSON)); + } + try (CloseableHttpResponse response = httpClient.execute(request, httpClientContext)) { + checkResponseCode(response); + return respDeserializer.apply(response.getEntity().getContent()); + } + } catch (Exception e) { + throw Helpers.failForMethodAndUri("POST", uri, e); + } + } + + private static void checkResponseCode(CloseableHttpResponse response) throws IOException { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode >= 300) { + String responseBody = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); + throw new RuntimeException(String.format("Http code '%s', response '%s'", statusCode, responseBody)); + } + } + + public URI buildUri(String queryUrl, Map parameters) { + return buildUri(queryUrl, parameters, true); + } + + URI buildUri(String queryUrl, Map parameters, boolean includeToken) { + try { + URIBuilder builder = new URIBuilder(marketoEndpoint + queryUrl); + parameters.forEach(builder::setParameter); + if (includeToken) { + builder.setParameter("access_token", token.getAccessToken()); + } + return builder.build(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(String.format("'%s' is invalid URI", marketoEndpoint + queryUrl)); + } + } + + MarketoToken getCurrentToken() { + return this.token; + } + + private MarketoToken refreshToken() { + LOG.debug("Requesting marketo token"); + URI getTokenUri = buildUri("/identity/oauth/token", + ImmutableMap.of("grant_type", "client_credentials", "client_id", clientId, + "client_secret", clientSecret), false); + return get(getTokenUri, inputStream -> GSON.fromJson(new InputStreamReader(inputStream), MarketoToken.class)); + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/MarketoPageIterator.java b/src/main/java/io/cdap/plugin/marketo/common/api/MarketoPageIterator.java new file mode 100644 index 0000000..1de07bf --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/MarketoPageIterator.java @@ -0,0 +1,74 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api; + +import io.cdap.plugin.marketo.common.api.entities.BaseResponse; + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.function.Function; + +/** + * Marketo page iterator. + * + * @param type of page response + * @param type of page item entity + */ +public class MarketoPageIterator implements Iterator { + private T currentPage; + private MarketoHttp marketo; + private String queryUrl; + private Class pageClass; + private Function> resultsGetter; + private Iterator currentPageResultIterator; + + MarketoPageIterator(T page, MarketoHttp marketo, String queryUrl, Class pageClass, + Function> resultsGetter) { + this.currentPage = page; + this.marketo = marketo; + this.queryUrl = queryUrl; + this.pageClass = pageClass; + this.resultsGetter = resultsGetter; + currentPageResultIterator = resultsGetter.apply(this.currentPage).iterator(); + } + + @Override + public boolean hasNext() { + if (currentPageResultIterator.hasNext()) { + return true; + } else { + T nextPage = marketo.getNextPage(currentPage, queryUrl, pageClass); + if (nextPage != null) { + currentPage = nextPage; + currentPageResultIterator = resultsGetter.apply(this.currentPage).iterator(); + return hasNext(); + } else { + return false; + } + } + } + + @Override + public I next() { + if (hasNext()) { + return currentPageResultIterator.next(); + } else { + throw new NoSuchElementException(); + } + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/TooManyJobsException.java b/src/main/java/io/cdap/plugin/marketo/common/api/TooManyJobsException.java new file mode 100644 index 0000000..ca23321 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/TooManyJobsException.java @@ -0,0 +1,23 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api; + +/** + * Exception thrown if too many jobs already queued. + */ +public class TooManyJobsException extends RuntimeException { +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/Urls.java b/src/main/java/io/cdap/plugin/marketo/common/api/Urls.java new file mode 100644 index 0000000..32479bb --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/Urls.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api; + +/** + * Marketo API urls. + */ +public class Urls { + public static final String LEADS_DESCRIBE = "/rest/v1/leads/describe.json"; + public static final String BULK_EXPORT_LEADS_LIST = "/bulk/v1/leads/export.json"; + public static final String BULK_EXPORT_LEADS_CREATE = "/bulk/v1/leads/export/create.json"; + public static final String BULK_EXPORT_LEADS_ENQUEUE = "/bulk/v1/leads/export/%s/enqueue.json"; + public static final String BULK_EXPORT_LEADS_STATUS = "/bulk/v1/leads/export/%s/status.json"; + public static final String BULK_EXPORT_LEADS_FILE = "/bulk/v1/leads/export/%s/file.json"; + public static final String BULK_EXPORT_ACTIVITIES_LIST = "/bulk/v1/activities/export.json"; + public static final String BULK_EXPORT_ACTIVITIES_CREATE = "/bulk/v1/activities/export/create.json"; + public static final String BULK_EXPORT_ACTIVITIES_ENQUEUE = "/bulk/v1/activities/export/%s/enqueue.json"; + public static final String BULK_EXPORT_ACTIVITIES_STATUS = "/bulk/v1/activities/export/%s/status.json"; + public static final String BULK_EXPORT_ACTIVITIES_FILE = "/bulk/v1/activities/export/%s/file.json"; + public static final String BUILD_IN_ACTIVITIES_TYPES = "/rest/v1/activities/types.json"; +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/BaseResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/BaseResponse.java new file mode 100644 index 0000000..2f679ca --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/BaseResponse.java @@ -0,0 +1,81 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities; + +import java.util.Collections; +import java.util.List; + +/** + * Represents common parts for all responses. + */ +public class BaseResponse { + + private boolean success = false; + private List errors = Collections.emptyList(); + private List warnings = Collections.emptyList(); + private String requestId; + private boolean moreResult = false; + private String nextPageToken; + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public List getErrors() { + return errors; + } + + public void setErrors(List errors) { + this.errors = errors; + } + + public List getWarnings() { + return warnings; + } + + public void setWarnings(List warnings) { + this.warnings = warnings; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public boolean isMoreResult() { + return moreResult; + } + + public void setMoreResult(boolean moreResult) { + this.moreResult = moreResult; + } + + public String getNextPageToken() { + return nextPageToken; + } + + public void setNextPageToken(String nextPageToken) { + this.nextPageToken = nextPageToken; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/DateRange.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/DateRange.java new file mode 100644 index 0000000..9e71503 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/DateRange.java @@ -0,0 +1,47 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities; + +/** + * Represents date range. + */ +public class DateRange { + String endAt; + String startAt; + + public DateRange() { + + } + + public DateRange(String startAt, String endAt) { + this.endAt = endAt; + this.startAt = startAt; + } + + public String getEndAt() { + return endAt; + } + + public String getStartAt() { + return startAt; + } + + @Override + public String toString() { + return startAt + " -- " + endAt; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/Error.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/Error.java new file mode 100644 index 0000000..594e9cb --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/Error.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities; + +/** + * Represents error message. + */ +public class Error { + private int code; + private String message; + + public Error(int code, String message) { + this.code = code; + this.message = message; + } + + public Error() { + } + + public int getCode() { + return code; + } + + public String getMessage() { + return message; + } + + @Override + public String toString() { + return String.format("code: %d, message: %s", code, message); + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/MarketoToken.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/MarketoToken.java new file mode 100644 index 0000000..258d93f --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/MarketoToken.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities; + +import com.google.gson.annotations.SerializedName; + +/** + * Represents marketo token response. + */ +public class MarketoToken { + @SerializedName("access_token") + private String accessToken; + private String scope; + @SerializedName("expires_in") + private String expiresIn; + @SerializedName("token_type") + private String tokenType; + + public MarketoToken(String accessToken, String scope, String expiresIn, String tokenType) { + this.accessToken = accessToken; + this.scope = scope; + this.expiresIn = expiresIn; + this.tokenType = tokenType; + } + + public String getAccessToken() { + return accessToken; + } + + public String getScope() { + return scope; + } + + public String getExpiresIn() { + return expiresIn; + } + + public String getTokenType() { + return tokenType; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/Warning.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/Warning.java new file mode 100644 index 0000000..b1167fc --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/Warning.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities; + +/** + * Represents warning message. + */ +public class Warning { + private int code; + private String message; + + public Warning(int code, String message) { + this.code = code; + this.message = message; + } + + public Warning() { + } + + public int getCode() { + return code; + } + + public String getMessage() { + return message; + } + + @Override + public String toString() { + return String.format("code: %d, message: %s", code, message); + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExport.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExport.java new file mode 100644 index 0000000..2cfc72e --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExport.java @@ -0,0 +1,78 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities.activities; + +/** + * Represents export response item. + */ +public class ActivitiesExport { + String createdAt; + String errorMsg; + String exportId; + int fileSize; + String fileChecksum; + String finishedAt; + String format; + int numberOfRecords; + String queuedAt; + String startedAt; + String status; + + public String getCreatedAt() { + return createdAt; + } + + public String getErrorMsg() { + return errorMsg; + } + + public String getExportId() { + return exportId; + } + + public int getFileSize() { + return fileSize; + } + + public String getFileChecksum() { + return fileChecksum; + } + + public String getFinishedAt() { + return finishedAt; + } + + public String getFormat() { + return format; + } + + public int getNumberOfRecords() { + return numberOfRecords; + } + + public String getQueuedAt() { + return queuedAt; + } + + public String getStartedAt() { + return startedAt; + } + + public String getStatus() { + return status; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportRequest.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportRequest.java new file mode 100644 index 0000000..31fc681 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportRequest.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities.activities; + +import java.util.List; +import java.util.Map; + +/** + * Represents activities bulk export request. + */ +public class ActivitiesExportRequest { + + Map columnHeaderNames = null; + List fields = null; + ExportActivityFilter filter = null; + String format = "CSV"; + + public ActivitiesExportRequest(List fields, ExportActivityFilter filter) { + this.fields = fields; + this.filter = filter; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportResponse.java new file mode 100644 index 0000000..0b19058 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportResponse.java @@ -0,0 +1,43 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities.activities; + +import io.cdap.plugin.marketo.common.api.entities.BaseResponse; + +import java.util.Collections; +import java.util.List; + +/** + * Represents activities bulk export response. + */ +public class ActivitiesExportResponse extends BaseResponse { + + List result = Collections.emptyList(); + + public ActivitiesExport singleExport() { + if (result.size() != 1) { + throw new IllegalStateException( + String.format("Expected single export job result, but found '%s' results.", result.size())); + } + return result.get(0); + } + + public List getResult() { + return result; + } + +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivityTypeResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivityTypeResponse.java new file mode 100644 index 0000000..b853484 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivityTypeResponse.java @@ -0,0 +1,95 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities.activities; + +import io.cdap.plugin.marketo.common.api.entities.BaseResponse; + +import java.util.Collections; +import java.util.List; + +/** + * Activity type response. + */ +public class ActivityTypeResponse extends BaseResponse { + /** + * Activity type attribute. + */ + public static class ActivityTypeAttribute { + private String apiName; + private String dataType; + private String name; + + public String getApiName() { + return apiName; + } + + public String getDataType() { + return dataType; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return String.format("%s(%s, %s)", getApiName(), getDataType(), getName()); + } + } + + /** + * Attribute type. + */ + public static class ActivityType { + private String apiName; + private List attributes; + private String description; + private Integer id; + private String name; + private ActivityTypeAttribute primaryAttribute; + + public String getApiName() { + return apiName; + } + + public List getAttributes() { + return attributes; + } + + public String getDescription() { + return description; + } + + public Integer getId() { + return id; + } + + public String getName() { + return name; + } + + public ActivityTypeAttribute getPrimaryAttribute() { + return primaryAttribute; + } + } + + List result = Collections.emptyList(); + + public List getResult() { + return result; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ExportActivityFilter.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ExportActivityFilter.java new file mode 100644 index 0000000..5e225cf --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ExportActivityFilter.java @@ -0,0 +1,75 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities.activities; + +import io.cdap.plugin.marketo.common.api.entities.DateRange; + +import java.util.List; + +/** + * Represents request filter. + */ +public class ExportActivityFilter { + private List activityTypeIds; + private DateRange createdAt; + + /** + * Builder. + */ + public static class Builder { + + private List activityTypeIds = null; + private DateRange createdAt = null; + + public Builder() { + } + + Builder(List activityTypeIds, DateRange createdAt) { + this.activityTypeIds = activityTypeIds; + this.createdAt = createdAt; + } + + public Builder activityTypeIds(List activityTypeIds) { + this.activityTypeIds = activityTypeIds; + return Builder.this; + } + + public Builder addActivityTypeIds(Integer activityTypeIds) { + this.activityTypeIds.add(activityTypeIds); + return Builder.this; + } + + public Builder createdAt(DateRange createdAt) { + this.createdAt = createdAt; + return Builder.this; + } + + public ExportActivityFilter build() { + + return new ExportActivityFilter(this); + } + } + + private ExportActivityFilter(Builder builder) { + this.activityTypeIds = builder.activityTypeIds; + this.createdAt = builder.createdAt; + } + + public static Builder builder() { + return new Builder(); + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsDescribeResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsDescribeResponse.java new file mode 100644 index 0000000..afd89ec --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsDescribeResponse.java @@ -0,0 +1,85 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities.leads; + +import io.cdap.plugin.marketo.common.api.entities.BaseResponse; + +import java.util.Collections; +import java.util.List; + +/** + * Represents leads describe response. + */ +public class LeadsDescribeResponse extends BaseResponse { + /** + * Represents lead field description. + */ + public static class LeadAttribute { + String dataType; + String displayName; + int id; + int length; + LeadMapAttribute rest; + LeadMapAttribute soap; + + public String getDataType() { + return dataType; + } + + public String getDisplayName() { + return displayName; + } + + public int getId() { + return id; + } + + public int getLength() { + return length; + } + + public LeadMapAttribute getRest() { + return rest; + } + + public LeadMapAttribute getSoap() { + return soap; + } + } + + /** + * Represents leads field name. + */ + public static class LeadMapAttribute { + String name; + boolean readOnly = true; + + public String getName() { + return name; + } + + public boolean isReadOnly() { + return readOnly; + } + } + + List result = Collections.emptyList(); + + public List getResult() { + return result; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExport.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExport.java new file mode 100644 index 0000000..f833207 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExport.java @@ -0,0 +1,78 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities.leads; + +/** + * Represents export response item. + */ +public class LeadsExport { + String createdAt; + String errorMsg; + String exportId; + int fileSize; + String fileChecksum; + String finishedAt; + String format; + int numberOfRecords; + String queuedAt; + String startedAt; + String status; + + public String getCreatedAt() { + return createdAt; + } + + public String getErrorMsg() { + return errorMsg; + } + + public String getExportId() { + return exportId; + } + + public int getFileSize() { + return fileSize; + } + + public String getFileChecksum() { + return fileChecksum; + } + + public String getFinishedAt() { + return finishedAt; + } + + public String getFormat() { + return format; + } + + public int getNumberOfRecords() { + return numberOfRecords; + } + + public String getQueuedAt() { + return queuedAt; + } + + public String getStartedAt() { + return startedAt; + } + + public String getStatus() { + return status; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportRequest.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportRequest.java new file mode 100644 index 0000000..e2035ba --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportRequest.java @@ -0,0 +1,112 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities.leads; + +import io.cdap.plugin.marketo.common.api.entities.DateRange; + +import java.util.List; +import java.util.Map; + +/** + * Represents leads bulk export request. + */ +public class LeadsExportRequest { + /** + * Represents request filter. + */ + public static class ExportLeadFilter { + DateRange createdAt = null; + Integer smartListId = null; + String smartListName = null; + Integer staticListId = null; + Integer staticListName = null; + DateRange updatedAt = null; + + /** + * Builder for ExportLeadFilter. + */ + public static class Builder { + private DateRange createdAt = null; + private Integer smartListId = null; + private String smartListName = null; + private Integer staticListId = null; + private Integer staticListName = null; + private DateRange updatedAt = null; + + public Builder() { + } + + public Builder createdAt(DateRange createdAt) { + this.createdAt = createdAt; + return Builder.this; + } + + public Builder smartListId(Integer smartListId) { + this.smartListId = smartListId; + return Builder.this; + } + + public Builder smartListName(String smartListName) { + this.smartListName = smartListName; + return Builder.this; + } + + public Builder staticListId(Integer staticListId) { + this.staticListId = staticListId; + return Builder.this; + } + + public Builder staticListName(Integer staticListName) { + this.staticListName = staticListName; + return Builder.this; + } + + public Builder updatedAt(DateRange updatedAt) { + this.updatedAt = updatedAt; + return Builder.this; + } + + public ExportLeadFilter build() { + + return new ExportLeadFilter(this); + } + } + + private ExportLeadFilter(Builder builder) { + this.createdAt = builder.createdAt; + this.smartListId = builder.smartListId; + this.smartListName = builder.smartListName; + this.staticListId = builder.staticListId; + this.staticListName = builder.staticListName; + this.updatedAt = builder.updatedAt; + } + + public static Builder builder() { + return new Builder(); + } + } + + Map columnHeaderNames = null; + List fields = null; + ExportLeadFilter filter = null; + String format = "CSV"; + + public LeadsExportRequest(List fields, ExportLeadFilter filter) { + this.fields = fields; + this.filter = filter; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportResponse.java new file mode 100644 index 0000000..bb58467 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportResponse.java @@ -0,0 +1,42 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.entities.leads; + +import io.cdap.plugin.marketo.common.api.entities.BaseResponse; + +import java.util.Collections; +import java.util.List; + +/** + * Represents leads bulk export response. + */ +public class LeadsExportResponse extends BaseResponse { + + List result = Collections.emptyList(); + + public LeadsExport singleExport() { + if (result.size() != 1) { + throw new IllegalStateException("Expected single export job result."); + } + return result.get(0); + } + + public List getResult() { + return result; + } + +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/job/AbstractBulkExportJob.java b/src/main/java/io/cdap/plugin/marketo/common/api/job/AbstractBulkExportJob.java new file mode 100644 index 0000000..de21ca8 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/job/AbstractBulkExportJob.java @@ -0,0 +1,138 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.job; + +import io.cdap.plugin.marketo.common.api.Helpers; +import io.cdap.plugin.marketo.common.api.Marketo; +import org.slf4j.Logger; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Base bulk export job wrapper. + * + * @param object that represents job status + */ +public abstract class AbstractBulkExportJob { + private static final List WAIT_ABLE_STATE = Arrays.asList("Queued", "Processing"); + private static final String ENQUEUE_ABLE_STATUS = "Created"; + private static final String COMPLETED_STATUS = "Completed"; + + private String jobId; + private T lastState; + private Marketo marketo; + + /** + * @param jobId job id + * @param lastState last status + * @param marketo marketo api instance + */ + public AbstractBulkExportJob(String jobId, + T lastState, + Marketo marketo) { + + this.jobId = jobId; + this.lastState = lastState; + this.marketo = marketo; + getLogger().info("{} - created job '{}'", getLogPrefix(), this.jobId); + } + + public abstract Logger getLogger(); + + public abstract T getFreshState(); + + public abstract String getStateStatus(T state); + + public abstract String getFileUrlTemplate(); + + public abstract String getLogPrefix(); + + protected abstract T enqueueImpl(); + + public T getLastState() { + return lastState; + } + + public Marketo getMarketo() { + return marketo; + } + + public void waitCompletion() { + if (!WAIT_ABLE_STATE.contains(getStateStatus(getLastState()))) { + throw new IllegalStateException("Job must be enqueued before waiting for completion."); + } + + do { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(30)); + } catch (InterruptedException e) { + throw new IllegalStateException("Failed to wait for job completion - interrupted"); + } + + T newState = getFreshState(); + logStatusChange(getStateStatus(getLastState()), getStateStatus(newState)); + lastState = newState; + } while (WAIT_ABLE_STATE.contains(getStateStatus(getLastState()))); + + if (!getStateStatus(getLastState()).equals(COMPLETED_STATUS)) { + throw new IllegalStateException("Job expected to be in Completed state, but was in " + + getStateStatus(getLastState())); + } + } + + public boolean enqueue() { + if (!getStateStatus(getLastState()).equals(ENQUEUE_ABLE_STATUS)) { + throw new IllegalStateException("Job must be in Created status before enqueuing, but was in " + + getStateStatus(getLastState())); + } + + if (!marketo.canEnqueueJob()) { + return false; + } + + T newState = enqueueImpl(); + + logStatusChange(getStateStatus(getLastState()), getStateStatus(newState)); + + if (!(getStateStatus(newState).equals("Queued") || getStateStatus(newState).equals("Processing"))) { + throw new IllegalStateException( + String.format("Expected Queued|Processing state for job '%s' but got '%s'", jobId, getStateStatus(newState))); + } + + lastState = newState; + + return true; + } + + private void logStatusChange(String oldStatus, String newStatus) { + if (!oldStatus.equals(newStatus)) { + getLogger().info("{} - job '{}' changed state '{}' -> '{}'", getLogPrefix(), jobId, oldStatus, newStatus); + } + } + + public String getFile() { + return marketo.get(marketo.buildUri(String.format(getFileUrlTemplate(), jobId), Collections.emptyMap()), + Helpers::streamToString); + } + + public String getJobId() { + return jobId; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/job/ActivitiesExportJob.java b/src/main/java/io/cdap/plugin/marketo/common/api/job/ActivitiesExportJob.java new file mode 100644 index 0000000..7a6c35d --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/job/ActivitiesExportJob.java @@ -0,0 +1,72 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.job; + +import io.cdap.plugin.marketo.common.api.Helpers; +import io.cdap.plugin.marketo.common.api.Marketo; +import io.cdap.plugin.marketo.common.api.Urls; +import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExport; +import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExportResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +/** + * Activities export job. + */ +public class ActivitiesExportJob extends AbstractBulkExportJob { + private static final Logger LOG = LoggerFactory.getLogger(ActivitiesExportJob.class); + + public ActivitiesExportJob(ActivitiesExport lastState, Marketo marketo) { + super(lastState.getExportId(), lastState, marketo); + } + + @Override + public Logger getLogger() { + return LOG; + } + + @Override + public ActivitiesExport getFreshState() { + return getMarketo().activitiesExportJobStatus(getJobId()); + } + + @Override + public String getStateStatus(ActivitiesExport state) { + return state.getStatus(); + } + + @Override + public String getFileUrlTemplate() { + return Urls.BULK_EXPORT_ACTIVITIES_FILE; + } + + @Override + public String getLogPrefix() { + return "BULK ACTIVITIES EXPORT"; + } + + @Override + protected ActivitiesExport enqueueImpl() { + return getMarketo().validatedPost( + String.format(Urls.BULK_EXPORT_ACTIVITIES_ENQUEUE, getJobId()), + Collections.emptyMap(), + inputStream -> Helpers.streamToObject(inputStream, ActivitiesExportResponse.class), + null, null).singleExport(); + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/job/LeadsExportJob.java b/src/main/java/io/cdap/plugin/marketo/common/api/job/LeadsExportJob.java new file mode 100644 index 0000000..f96587f --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/common/api/job/LeadsExportJob.java @@ -0,0 +1,72 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api.job; + +import io.cdap.plugin.marketo.common.api.Helpers; +import io.cdap.plugin.marketo.common.api.Marketo; +import io.cdap.plugin.marketo.common.api.Urls; +import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExport; +import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExportResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +/** + * Leads export job. + */ +public class LeadsExportJob extends AbstractBulkExportJob { + private static final Logger LOG = LoggerFactory.getLogger(LeadsExportJob.class); + + public LeadsExportJob(LeadsExport lastState, Marketo marketo) { + super(lastState.getExportId(), lastState, marketo); + } + + @Override + public Logger getLogger() { + return LOG; + } + + @Override + public LeadsExport getFreshState() { + return getMarketo().leadsExportJobStatus(getJobId()); + } + + @Override + public String getStateStatus(LeadsExport state) { + return state.getStatus(); + } + + @Override + public String getFileUrlTemplate() { + return Urls.BULK_EXPORT_LEADS_FILE; + } + + @Override + public String getLogPrefix() { + return "BULK LEADS EXPORT"; + } + + @Override + protected LeadsExport enqueueImpl() { + return getMarketo().validatedPost( + String.format(Urls.BULK_EXPORT_LEADS_ENQUEUE, getJobId()), + Collections.emptyMap(), + inputStream -> Helpers.streamToObject(inputStream, LeadsExportResponse.class), + null, null).singleExport(); + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormat.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormat.java new file mode 100644 index 0000000..096b842 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormat.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.source.batch; + +import com.google.gson.Gson; +import io.cdap.plugin.marketo.common.api.Helpers; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * InputFormat for mapreduce job, which provides a single split of data. + */ +public class MarketoInputFormat extends InputFormat { + private static final Gson GSON = new Gson(); + + @Override + public List getSplits(JobContext jobContext) { + Configuration conf = jobContext.getConfiguration(); + MarketoReportingSourceConfig config = GSON.fromJson( + conf.get(MarketoInputFormatProvider.PROPERTY_CONFIG_JSON), MarketoReportingSourceConfig.class); + + return Helpers.getDateRanges(config.getStartDate(), config.getEndDate()).stream() + .map(dateRange -> new MarketoReportingSplit(dateRange.getStartAt(), dateRange.getEndAt())) + .collect(Collectors.toList()); + } + + @Override + public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { + MarketoReportingSplit split = (MarketoReportingSplit) inputSplit; + return new MarketoRecordReader(split.getBeginDate(), split.getEndDate()); + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormatProvider.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormatProvider.java new file mode 100644 index 0000000..8a70ed1 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormatProvider.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.source.batch; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.data.batch.InputFormatProvider; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * InputFormatProvider used by cdap to provide configurations to mapreduce job + */ +public class MarketoInputFormatProvider implements InputFormatProvider { + public static final String PROPERTY_CONFIG_JSON = "cdap.marketo.reporter.config"; + private static final Gson gson = new GsonBuilder().create(); + private final Map conf; + + + MarketoInputFormatProvider(MarketoReportingSourceConfig config) { + this.conf = Collections.unmodifiableMap(new HashMap() {{ + put(PROPERTY_CONFIG_JSON, gson.toJson(config)); + }}); + } + + @Override + public String getInputFormatClassName() { + return MarketoInputFormat.class.getName(); + } + + @Override + public Map getInputFormatConfiguration() { + return conf; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoRecordReader.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoRecordReader.java new file mode 100644 index 0000000..4af14f6 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoRecordReader.java @@ -0,0 +1,138 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.source.batch; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.marketo.common.api.Marketo; +import io.cdap.plugin.marketo.common.api.entities.DateRange; +import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExportRequest; +import io.cdap.plugin.marketo.common.api.entities.activities.ExportActivityFilter; +import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExportRequest; +import io.cdap.plugin.marketo.common.api.job.AbstractBulkExportJob; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * RecordReader implementation, which reads events from Marketo api. + */ +public class MarketoRecordReader extends RecordReader> { + private static final Logger LOG = LoggerFactory.getLogger(MarketoRecordReader.class); + private static final Gson GSON = new GsonBuilder().create(); + /** + * Wait for 25 minutes for available slot in queue. + */ + private static final long JOB_ENQUEUE_TIMEOUT = 25 * 60; + private Map current = null; + private Iterator iterator = null; + private String beginDate; + private String endDate; + + public MarketoRecordReader(String beginDate, String endDate) { + this.beginDate = beginDate; + this.endDate = endDate; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException { + Configuration conf = taskAttemptContext.getConfiguration(); + String configJson = conf.get(MarketoInputFormatProvider.PROPERTY_CONFIG_JSON); + MarketoReportingSourceConfig config = GSON.fromJson(configJson, MarketoReportingSourceConfig.class); + + Marketo marketo = config.getMarketo(); + + DateRange dateRange = new DateRange(beginDate, endDate); + AbstractBulkExportJob job; + switch (config.getReportType()) { + case LEADS: + List leadsFields = config.getSchema().getFields().stream() + .map(Schema.Field::getName) + .collect(Collectors.toList()); + + LeadsExportRequest.ExportLeadFilter leadsFilter = LeadsExportRequest.ExportLeadFilter.builder() + .createdAt(dateRange) + .build(); + + job = marketo.exportLeads(new LeadsExportRequest(leadsFields, leadsFilter)); + break; + case ACTIVITIES: + ExportActivityFilter activitiesFilter = ExportActivityFilter.builder() + .createdAt(dateRange) + .build(); + + job = marketo.exportActivities(new ActivitiesExportRequest(Marketo.ACTIVITY_FIELDS, activitiesFilter)); + break; + default: + throw new IllegalArgumentException("Invalid report type " + config.getReportType()); + } + + LOG.info("BULK EXPORT JOB - job '{}' has date range '{}'", job.getJobId(), dateRange); + + // wait for 25 minutes for available slot and enqueue job + marketo.onBulkExtractQueueAvailable(job::enqueue, JOB_ENQUEUE_TIMEOUT); + + job.waitCompletion(); + + String data = job.getFile(); + CSVParser parser = CSVFormat.DEFAULT.withHeader().parse(new StringReader(data)); + iterator = parser.iterator(); + } + + @Override + public boolean nextKeyValue() { + if (iterator.hasNext()) { + current = iterator.next().toMap(); + return true; + } + return false; + } + + @Override + public NullWritable getCurrentKey() { + return null; + } + + @Override + public Map getCurrentValue() { + return current; + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public void close() { + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingPlugin.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingPlugin.java new file mode 100644 index 0000000..585814d --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingPlugin.java @@ -0,0 +1,82 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.source.batch; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Input; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.plugin.common.IdUtils; +import io.cdap.plugin.common.LineageRecorder; +import org.apache.hadoop.io.NullWritable; + +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Plugin that reads entities from Marketo api. + */ +@Plugin(type = BatchSource.PLUGIN_TYPE) +@Name(MarketoReportingPlugin.NAME) +@Description("Reads Leads or Activities from Marketo.") +public class MarketoReportingPlugin extends BatchSource, StructuredRecord> { + public static final String NAME = "MarketoReportingPlugin"; + + private final MarketoReportingSourceConfig config; + + public MarketoReportingPlugin(MarketoReportingSourceConfig config) { + this.config = config; + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + validateConfiguration(pipelineConfigurer.getStageConfigurer().getFailureCollector()); + pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema()); + } + + @Override + public void prepareRun(BatchSourceContext batchSourceContext) { + validateConfiguration(batchSourceContext.getFailureCollector()); + LineageRecorder lineageRecorder = new LineageRecorder(batchSourceContext, config.referenceName); + lineageRecorder.createExternalDataset(config.getSchema()); + lineageRecorder.recordRead("Read", "Reading Marketo entities", + Objects.requireNonNull(config.getSchema().getFields()).stream() + .map(Schema.Field::getName) + .collect(Collectors.toList())); + + batchSourceContext.setInput(Input.of(config.referenceName, new MarketoInputFormatProvider(config))); + } + + @Override + public void transform(KeyValue> input, Emitter emitter) { + emitter.emit(MarketoReportingSchemaHelper.getRecord(config.getSchema(), input.getValue())); + } + + private void validateConfiguration(FailureCollector failureCollector) { + config.validate(failureCollector); + failureCollector.getOrThrowException(); + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSchemaHelper.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSchemaHelper.java new file mode 100644 index 0000000..0a578fd --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSchemaHelper.java @@ -0,0 +1,71 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.source.batch; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.marketo.common.api.Marketo; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Various methods to deal with schema and record. + */ +public class MarketoReportingSchemaHelper { + public static Schema getActivitySchema() { + List fields = Marketo.ACTIVITY_FIELDS.stream() + .map(s -> Schema.Field.of(s, Schema.nullableOf(Schema.of(Schema.Type.STRING)))) + .collect(Collectors.toList()); + return Schema.recordOf("activityRecord", fields); + } + + public static Schema getSchema(MarketoReportingSourceConfig config) { + switch (config.getReportType()) { + case LEADS: + List fields = config.getMarketo().describeLeads().stream().map( + leadAttribute -> { + if (leadAttribute.getRest() != null) { + return Schema.Field.of(leadAttribute.getRest().getName(), + Schema.nullableOf(Schema.of(Schema.Type.STRING))); + } else { + return null; + } + } + ).filter(Objects::nonNull).collect(Collectors.toList()); + + return Schema.recordOf("LeadsRecord", fields); + case ACTIVITIES: + return getActivitySchema(); + } + throw new IllegalArgumentException("Failed to get schema for type " + config.getReportType()); + } + + public static StructuredRecord getRecord(Schema schema, Map fields) { + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + schema.getFields().forEach( + field -> { + if (fields.containsKey(field.getName())) { + builder.set(field.getName(), fields.get(field.getName())); + } + } + ); + return builder.build(); + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSourceConfig.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSourceConfig.java new file mode 100644 index 0000000..e7282aa --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSourceConfig.java @@ -0,0 +1,199 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.source.batch; + +import com.google.common.base.Strings; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.common.IdUtils; +import io.cdap.plugin.common.ReferencePluginConfig; +import io.cdap.plugin.marketo.common.api.Marketo; + +import java.net.MalformedURLException; +import java.net.URL; +import java.time.OffsetDateTime; +import java.time.format.DateTimeParseException; + +/** + * Provides all required configuration for reading Marketo entities. + */ +public class MarketoReportingSourceConfig extends ReferencePluginConfig { + public static final String PROPERTY_CLIENT_ID = "clientId"; + public static final String PROPERTY_CLIENT_SECRET = "clientSecret"; + public static final String PROPERTY_REST_API_ENDPOINT = "restApiEndpoint"; + public static final String PROPERTY_REPORT_TYPE = "reportType"; + public static final String PROPERTY_START_DATE = "startDate"; + public static final String PROPERTY_END_DATE = "endDate"; + + @Name(PROPERTY_CLIENT_ID) + @Description("Marketo Client ID.") + @Macro + protected String clientId; + + @Name(PROPERTY_CLIENT_SECRET) + @Description("Marketo Client secret.") + @Macro + protected String clientSecret; + + @Name(PROPERTY_REST_API_ENDPOINT) + @Description("REST API endpoint URL.") + @Macro + protected String restApiEndpoint; + + @Name(PROPERTY_REPORT_TYPE) + @Description("Report type format, leads or activities.") + @Macro + protected String reportType; + + @Name(PROPERTY_START_DATE) + @Description("Start date for the report.") + @Macro + protected String startDate; + + @Name(PROPERTY_END_DATE) + @Description("End date for the report.") + @Macro + protected String endDate; + + private transient Schema schema = null; + private transient Marketo marketo = null; + + public MarketoReportingSourceConfig(String referenceName) { + super(referenceName); + } + + public Schema getSchema() { + if (schema == null) { + schema = MarketoReportingSchemaHelper.getSchema(this); + } + return schema; + } + + public Marketo getMarketo() { + if (marketo == null) { + marketo = new Marketo(getRestApiEndpoint(), getClientId(), getClientSecret()); + } + return marketo; + } + + public String getClientId() { + return clientId; + } + + public String getClientSecret() { + return clientSecret; + } + + public String getRestApiEndpoint() { + return restApiEndpoint; + } + + public String getStartDate() { + return startDate; + } + + public String getEndDate() { + return endDate; + } + + public ReportType getReportType() { + return ReportType.fromString(reportType); + } + + void validate(FailureCollector failureCollector) { + IdUtils.validateReferenceName(referenceName, failureCollector); + validateDate(failureCollector); + validateReportType(failureCollector); + validateMarketoEndpoint(failureCollector); + validateSecrets(failureCollector); + } + + void validateDate(FailureCollector failureCollector) { + if (!containsMacro(PROPERTY_START_DATE)) { + try { + OffsetDateTime.parse(getStartDate()); + } catch (DateTimeParseException ex) { + failureCollector.addFailure("Failed to parse start date.", + "Correct date to ISO 8601 format.") + .withConfigProperty(PROPERTY_START_DATE); + } + } + + if (!containsMacro(PROPERTY_END_DATE)) { + try { + OffsetDateTime.parse(getStartDate()); + } catch (DateTimeParseException ex) { + failureCollector.addFailure("Failed to parse end date.", + "Correct date to ISO 8601 format.") + .withConfigProperty(PROPERTY_END_DATE); + } + } + + if (!(containsMacro(PROPERTY_START_DATE) && containsMacro(PROPERTY_END_DATE))) { + try { + OffsetDateTime start = OffsetDateTime.parse(getStartDate()); + OffsetDateTime end = OffsetDateTime.parse(getEndDate()); + + if (start.compareTo(end) > 0) { + failureCollector.addFailure("Start date cannot be greater than the end date.", "Swap dates.") + .withConfigProperty(PROPERTY_START_DATE).withConfigProperty(PROPERTY_END_DATE); + } + } catch (DateTimeParseException ex) { + // silently ignore parsing exceptions, we already pushed messages for malformed dates + } + } + } + + void validateReportType(FailureCollector failureCollector) { + if (!containsMacro(PROPERTY_REPORT_TYPE)) { + try { + getReportType(); + } catch (IllegalArgumentException ex) { + failureCollector.addFailure(String.format("Incorrect reporting type '%s'.", reportType), + "Set reporting type to 'activities' or 'leads'.") + .withConfigProperty(PROPERTY_REPORT_TYPE); + } + } + } + + void validateSecrets(FailureCollector failureCollector) { + if (!containsMacro(PROPERTY_CLIENT_ID) && Strings.isNullOrEmpty(getClientId())) { + failureCollector.addFailure("Client ID is empty.", null) + .withConfigProperty(PROPERTY_CLIENT_ID); + } + + if (!containsMacro(PROPERTY_CLIENT_SECRET) && Strings.isNullOrEmpty(getClientSecret())) { + failureCollector.addFailure("Client Secret is empty.", null) + .withConfigProperty(PROPERTY_CLIENT_SECRET); + } + } + + void validateMarketoEndpoint(FailureCollector failureCollector) { + if (!containsMacro(PROPERTY_REST_API_ENDPOINT)) { + try { + new URL(getRestApiEndpoint()); + } catch (MalformedURLException e) { + failureCollector + .addFailure(String.format("Malformed Marketo Rest API endpoint URL '%s'.", getRestApiEndpoint()), null) + .withConfigProperty(PROPERTY_REST_API_ENDPOINT); + } + } + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSplit.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSplit.java new file mode 100644 index 0000000..7e12e27 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSplit.java @@ -0,0 +1,70 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.source.batch; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * A no-op split. + */ +public class MarketoReportingSplit extends InputSplit implements Writable { + private String beginDate; + private String endDate; + + public MarketoReportingSplit() { + } + + public MarketoReportingSplit(String beginDate, String endDate) { + this.beginDate = beginDate; + this.endDate = endDate; + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + beginDate = dataInput.readUTF(); + endDate = dataInput.readUTF(); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeUTF(beginDate); + dataOutput.writeUTF(endDate); + } + + @Override + public long getLength() { + return 0; + } + + @Override + public String[] getLocations() { + return new String[0]; + } + + public String getBeginDate() { + return beginDate; + } + + public String getEndDate() { + return endDate; + } +} diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/ReportType.java b/src/main/java/io/cdap/plugin/marketo/source/batch/ReportType.java new file mode 100644 index 0000000..13193f0 --- /dev/null +++ b/src/main/java/io/cdap/plugin/marketo/source/batch/ReportType.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.source.batch; + +/** + * Represents report type. + */ +public enum ReportType { + LEADS("leads"), + ACTIVITIES("activities"); + + private String type; + + ReportType(String type) { + this.type = type; + } + + public static ReportType fromString(String reportType) { + for (ReportType rt : ReportType.values()) { + if (rt.type.equals(reportType)) { + return rt; + } + } + throw new IllegalArgumentException("Unknown report type: " + reportType); + } +} diff --git a/src/test/java/io/cdap/plugin/marketo/common/api/MarketoHttpTest.java b/src/test/java/io/cdap/plugin/marketo/common/api/MarketoHttpTest.java new file mode 100644 index 0000000..80c7071 --- /dev/null +++ b/src/test/java/io/cdap/plugin/marketo/common/api/MarketoHttpTest.java @@ -0,0 +1,291 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.marketo.common.api; + +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.github.tomakehurst.wiremock.stubbing.Scenario; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import io.cdap.plugin.marketo.common.api.entities.BaseResponse; +import io.cdap.plugin.marketo.common.api.entities.Error; +import io.cdap.plugin.marketo.common.api.entities.MarketoToken; +import io.cdap.plugin.marketo.common.api.entities.Warning; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class MarketoHttpTest { + private static final Gson GSON = new Gson(); + + @Rule + public WireMockRule wireMockRule = new WireMockRule( + WireMockConfiguration.wireMockConfig().dynamicPort() + ); + + public static class StubResponse extends BaseResponse { + StubResponse(boolean success, List errors, List warnings) { + setSuccess(success); + setErrors(errors); + setWarnings(warnings); + } + } + + public static class PageResponse extends BaseResponse { + private List results; + + PageResponse(boolean moreResults, String nextPageToken, String... items) { + setSuccess(true); + setMoreResult(moreResults); + results = Arrays.asList(items); + setNextPageToken(nextPageToken); + } + + public List getResults() { + return results; + } + } + + @Test + public void testPaging() { + setupToken(); + + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/rest/v1/paged.json")).inScenario("page") + .whenScenarioStateIs(Scenario.STARTED) + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new PageResponse(true, "page1", "1", "2", "3")) + ) + ) + .willSetStateTo("page1") + ); + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/rest/v1/paged.json")).inScenario("page") + .whenScenarioStateIs("page1") + .withQueryParam("nextPageToken", WireMock.equalTo("page1")) + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new PageResponse(true, "page2", "4", "5", "6")) + ) + ) + .willSetStateTo("page2") + ); + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/rest/v1/paged.json")).inScenario("page") + .whenScenarioStateIs("page2") + .withQueryParam("nextPageToken", WireMock.equalTo("page2")) + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new PageResponse(false, null, "7", "8", "9")) + ) + ) + .willSetStateTo("page3") + ); + + MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret"); + + List results = StreamSupport.stream(Spliterators.spliteratorUnknownSize( + m.iteratePage("/rest/v1/paged.json", PageResponse.class, PageResponse::getResults), + Spliterator.ORDERED), false).sorted().collect(Collectors.toList()); + + Assert.assertArrayEquals(new String[]{"1", "2", "3", "4", "5", "6", "7", "8", "9"}, results.toArray()); + } + + @Test + public void testToken() { + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/identity/oauth/token")) + .withQueryParam("grant_type", WireMock.equalTo("client_credentials")) + .withQueryParam("client_id", WireMock.equalTo("clientNiceId")) + .withQueryParam("client_secret", WireMock.equalTo("clientNiceSecret")) + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new MarketoToken("niceToken", "hello@world.com", "3600", "bearer") + ) + ) + ) + ); + + MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret"); + Assert.assertEquals("niceToken", m.getCurrentToken().getAccessToken()); + } + + @Test + public void testTokenRefresh() { + setupToken(); + + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/rest/v1/stub.json")).inScenario("retry") + .whenScenarioStateIs(Scenario.STARTED) + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new StubResponse(false, + Collections.singletonList( + new Error(602, "Access token expired")), + Collections.emptyList())) + ) + ) + .willSetStateTo("refreshed") + ); + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/rest/v1/stub.json")).inScenario("retry") + .whenScenarioStateIs("refreshed") + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new StubResponse(true, Collections.emptyList(), Collections.emptyList())) + ) + ) + ); + MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret"); + m.validatedGet("/rest/v1/stub.json", Collections.emptyMap(), + inputStream -> Helpers.streamToObject(inputStream, StubResponse.class)); + WireMock.verify(WireMock.exactly(2), + WireMock.getRequestedFor(WireMock.urlPathEqualTo("/identity/oauth/token"))); + WireMock.verify(WireMock.exactly(2), + WireMock.getRequestedFor(WireMock.urlPathEqualTo("/rest/v1/stub.json"))); + } + + @Test + public void testPost() { + setupToken(); + + WireMock.stubFor( + WireMock.post(WireMock.urlPathMatching("/rest/v1/post.json")) + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new StubResponse(true, Collections.emptyList(), Collections.emptyList())) + ) + ) + ); + + MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret"); + m.validatedPost("/rest/v1/post.json", Collections.emptyMap(), + inputStream -> Helpers.streamToObject(inputStream, StubResponse.class), "body", String::toString); + + WireMock.verify( + WireMock.postRequestedFor(WireMock.urlPathEqualTo("/rest/v1/post.json")) + .withRequestBody(WireMock.equalTo("body")) + ); + } + + @Test + public void testMessages() { + setupToken(); + + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/rest/v1/justWarnings.json")) + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new StubResponse(true, Collections.emptyList(), Arrays.asList( + new Warning(700, "Reversed agent 007"), + new Warning(777, "Result of 1000 - 333") + )))) + ) + ); + + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/rest/v1/errors.json")) + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new StubResponse(false, + Collections.singletonList(new Error(123, "No way")), + Collections.emptyList()))) + ) + ); + + MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret"); + m.validatedGet("/rest/v1/justWarnings.json", Collections.emptyMap(), + inputStream -> Helpers.streamToObject(inputStream, StubResponse.class)); + + try { + m.validatedGet("/rest/v1/errors.json", Collections.emptyMap(), + inputStream -> Helpers.streamToObject(inputStream, StubResponse.class)); + Assert.fail("This call expected to fail."); + } catch (RuntimeException ex) { + Assert.assertTrue(ex.getMessage().contains("123")); + Assert.assertTrue(ex.getMessage().contains("No way")); + } + } + + @Test + public void testBuildUri() { + setupToken(); + MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret"); + String uriWithToken = m.buildUri("/hello", Collections.emptyMap()).toString(); + Assert.assertTrue(uriWithToken.contains("access_token")); + String uriWithoutToken = m.buildUri("/hello", ImmutableMap.of("param", "value"), false).toString(); + Assert.assertFalse(uriWithoutToken.contains("access_token")); + Assert.assertTrue(uriWithoutToken.contains("param=value")); + } + + @Test + public void testHttpError() { + setupToken(); + + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/rest/v1/fail.json")) + .willReturn( + WireMock.aResponse().withStatus(500).withBody("GJ server.") + ) + ); + + try { + MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret"); + m.validatedGet("/rest/v1/fail.json", Collections.emptyMap(), + inputStream -> Helpers.streamToObject(inputStream, StubResponse.class)); + Assert.fail("This call expected to fail."); + } catch (RuntimeException ex) { + Assert.assertTrue(ex.getMessage().contains("GJ server")); + } + } + + @Test + public void invalidEndpoint() { + try { + new MarketoHttp("%^%^&%^", "clientNiceId", "clientNiceSecret"); + Assert.fail("This call expected to fail."); + } catch (IllegalArgumentException ex) { + Assert.assertEquals("'%^%^&%^/identity/oauth/token' is invalid URI", ex.getMessage()); + } + } + + void setupToken() { + WireMock.stubFor( + WireMock.get(WireMock.urlPathMatching("/identity/oauth/token")) + .willReturn( + WireMock.aResponse().withBody( + GSON.toJson(new MarketoToken("niceToken", "hello@world.com", "3600", "bearer") + ) + ) + ) + ); + } + + String getApiUrl() { + return String.format("http://localhost:%d", wireMockRule.port()); + } +} diff --git a/suppressions.xml b/suppressions.xml new file mode 100644 index 0000000..600350b --- /dev/null +++ b/suppressions.xml @@ -0,0 +1,33 @@ + + + + + + + + + + + + + + + + + diff --git a/widgets/MarketoReportingPlugin-batchsource.json b/widgets/MarketoReportingPlugin-batchsource.json new file mode 100644 index 0000000..e5390ba --- /dev/null +++ b/widgets/MarketoReportingPlugin-batchsource.json @@ -0,0 +1,77 @@ +{ + "metadata": { + "spec-version": "1.0" + }, + "display-name": "Marketo Reporting", + "configuration-groups": [ + { + "label": "General", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName" + }, + { + "widget-type": "textbox", + "label": "Rest API endpoint", + "name": "restApiEndpoint" + } + ] + }, + { + "label": "Authentication", + "properties": [ + { + "widget-type": "textbox", + "label": "Client ID", + "name": "clientId" + }, + { + "widget-type": "password", + "label": "Client Secret", + "name": "clientSecret" + } + ] + }, + { + "label": "Report", + "properties": [ + { + "widget-type": "select", + "label": "Report Type", + "name": "reportType", + "widget-attributes": { + "default": "leads", + "values": [ + "leads", + "activities" + ] + } + }, + { + "widget-type": "textbox", + "label": "Start Date", + "name": "startDate", + "widget-attributes": { + "placeholder": "Start date in ISO 8601 format(1997-07-16T19:20:30+01:00)" + } + }, + { + "widget-type": "textbox", + "label": "End Date", + "name": "endDate", + "widget-attributes": { + "placeholder": "End date in ISO 8601 format(1997-07-16T19:20:30+01:00)" + } + } + ] + } + ], + "outputs": [ + { + "widget-type": "non-editable-schema-editor", + "schema": {} + } + ] +} \ No newline at end of file