|
1 | 1 | package org.influxdb.impl; |
2 | 2 |
|
3 | | -import java.lang.reflect.Field; |
4 | | -import java.time.Instant; |
5 | | -import java.util.List; |
6 | | -import java.util.concurrent.ConcurrentMap; |
7 | | -import java.util.concurrent.TimeUnit; |
8 | 3 | import org.influxdb.InfluxDB; |
9 | | -import org.influxdb.InfluxDBMapperException; |
10 | | -import org.influxdb.annotation.Column; |
11 | 4 | import org.influxdb.annotation.Measurement; |
12 | 5 | import org.influxdb.dto.Point; |
13 | 6 | import org.influxdb.dto.Query; |
14 | 7 | import org.influxdb.dto.QueryResult; |
15 | 8 |
|
| 9 | +import java.util.List; |
| 10 | + |
16 | 11 | public class InfluxDBMapper extends InfluxDBResultMapper { |
17 | 12 |
|
18 | 13 | private final InfluxDB influxDB; |
@@ -52,91 +47,16 @@ public <T> List<T> query(final Class<T> clazz) { |
52 | 47 |
|
53 | 48 | public <T> void save(final T model) { |
54 | 49 | throwExceptionIfMissingAnnotation(model.getClass()); |
55 | | - cacheMeasurementClass(model.getClass()); |
56 | | - |
57 | | - ConcurrentMap<String, Field> colNameAndFieldMap = getColNameAndFieldMap(model.getClass()); |
58 | | - |
59 | | - try { |
60 | | - Class<?> modelType = model.getClass(); |
61 | | - String measurement = getMeasurementName(modelType); |
62 | | - String database = getDatabaseName(modelType); |
63 | | - String retentionPolicy = getRetentionPolicy(modelType); |
64 | | - TimeUnit timeUnit = getTimeUnit(modelType); |
65 | | - long time = timeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); |
66 | | - Point.Builder pointBuilder = Point.measurement(measurement).time(time, timeUnit); |
67 | | - |
68 | | - for (String key : colNameAndFieldMap.keySet()) { |
69 | | - Field field = colNameAndFieldMap.get(key); |
70 | | - Column column = field.getAnnotation(Column.class); |
71 | | - String columnName = column.name(); |
72 | | - Class<?> fieldType = field.getType(); |
73 | | - |
74 | | - if (!field.isAccessible()) { |
75 | | - field.setAccessible(true); |
76 | | - } |
77 | | - |
78 | | - Object value = field.get(model); |
79 | | - |
80 | | - if (column.tag()) { |
81 | | - /** Tags are strings either way. */ |
82 | | - pointBuilder.tag(columnName, value.toString()); |
83 | | - } else if ("time".equals(columnName)) { |
84 | | - if (value != null) { |
85 | | - setTime(pointBuilder, fieldType, timeUnit, value); |
86 | | - } |
87 | | - } else { |
88 | | - setField(pointBuilder, fieldType, columnName, value); |
89 | | - } |
90 | | - } |
91 | | - |
92 | | - Point point = pointBuilder.build(); |
| 50 | + Class<?> modelType = model.getClass(); |
| 51 | + String database = getDatabaseName(modelType); |
| 52 | + String retentionPolicy = getRetentionPolicy(modelType); |
| 53 | + Point.Builder pointBuilder = Point.measurementByPOJO(modelType).addFieldsFromPOJO(model); |
| 54 | + Point point = pointBuilder.build(); |
93 | 55 |
|
94 | | - if ("[unassigned]".equals(database)) { |
95 | | - influxDB.write(point); |
96 | | - } else { |
97 | | - influxDB.write(database, retentionPolicy, point); |
98 | | - } |
99 | | - |
100 | | - } catch (IllegalAccessException e) { |
101 | | - throw new InfluxDBMapperException(e); |
102 | | - } |
103 | | - } |
104 | | - |
105 | | - private void setTime( |
106 | | - final Point.Builder pointBuilder, |
107 | | - final Class<?> fieldType, |
108 | | - final TimeUnit timeUnit, |
109 | | - final Object value) { |
110 | | - if (Instant.class.isAssignableFrom(fieldType)) { |
111 | | - Instant instant = (Instant) value; |
112 | | - long time = timeUnit.convert(instant.toEpochMilli(), TimeUnit.MILLISECONDS); |
113 | | - pointBuilder.time(time, timeUnit); |
114 | | - } else { |
115 | | - throw new InfluxDBMapperException( |
116 | | - "Unsupported type " + fieldType + " for time: should be of Instant type"); |
117 | | - } |
118 | | - } |
119 | | - |
120 | | - private void setField( |
121 | | - final Point.Builder pointBuilder, |
122 | | - final Class<?> fieldType, |
123 | | - final String columnName, |
124 | | - final Object value) { |
125 | | - if (boolean.class.isAssignableFrom(fieldType) || Boolean.class.isAssignableFrom(fieldType)) { |
126 | | - pointBuilder.addField(columnName, (boolean) value); |
127 | | - } else if (long.class.isAssignableFrom(fieldType) || Long.class.isAssignableFrom(fieldType)) { |
128 | | - pointBuilder.addField(columnName, (long) value); |
129 | | - } else if (double.class.isAssignableFrom(fieldType) |
130 | | - || Double.class.isAssignableFrom(fieldType)) { |
131 | | - pointBuilder.addField(columnName, (double) value); |
132 | | - } else if (int.class.isAssignableFrom(fieldType) || Integer.class.isAssignableFrom(fieldType)) { |
133 | | - pointBuilder.addField(columnName, (int) value); |
134 | | - } else if (String.class.isAssignableFrom(fieldType)) { |
135 | | - pointBuilder.addField(columnName, (String) value); |
| 56 | + if ("[unassigned]".equals(database)) { |
| 57 | + influxDB.write(point); |
136 | 58 | } else { |
137 | | - throw new InfluxDBMapperException( |
138 | | - "Unsupported type " + fieldType + " for column " + columnName); |
| 59 | + influxDB.write(database, retentionPolicy, point); |
139 | 60 | } |
140 | 61 | } |
141 | | - |
142 | 62 | } |
0 commit comments