Nav

CSV Save Writer Script

The CSV save writer script is what writes the metric data to CSV files.

Run the CSV save writer script once you’ve run the start script.

Near the top of the script, you can configure the directory where the CSV files are written. By default, this is in the Tomcat logs/ directory, but you may configure any absolute filesystem path here. Each invocation of the CSV save writer script collects the JMX metric value(s) from each server in the monitored server group, and writes one CSV file per server JVM.

The period of all data in one CSV file is the same period, because dealing with mixed period data is a more complex problem. If you want to collect data of different periods, you should duplicate the scripts, one configured copy for each sampling period you would like to use.

Schedule the CSV save writer script to run in the Tcat console admin shell at the same period that you configure data collection to occur inside the CSV save start script. The Tcat admin shell scheduler must run the writer script at the same period. When configuring the scheduler to run the CSV save writer script, you should check the Allow Concurrent Execution checkbox.

Currently, the CSV files grow unbounded, but you may either move, truncate, or delete them at any time, and the console starts a new one. If you have an external log rolling system (such as logrotate on Linux), you can roll these CSV files that way. Otherwise, you can allow the CSV files to grow until you either delete it or move it to another filename.


       
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import com.mulesoft.tcat.Server
import com.mulesoft.common.remoting.RemoteContext
import com.mulesoft.common.server.ServerHealthEvent
import com.mulesoft.tcat.osutil.ReferenceProcessor
import org.mule.galaxy.impl.jcr.JcrItem
import org.mule.galaxy.ee.common.dto.ChartDefinition
import org.mule.galaxy.util.SecurityUtils
import org.mule.galaxy.impl.jcr.JcrUtil
 
import org.springmodules.jcr.JcrCallback
import org.springframework.remoting.RemoteAccessException
import java.io.File
import java.io.FileWriter
import java.util.HashMap
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorCompletionService
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicLong
import java.text.SimpleDateFormat
 
// The below variables are for the user to configure, to set what is to
// be monitored on which server group.
def groupId = "401e81bf-b792-49fd-9006-eff59cd29387"
def objectNames = [ "Catalina:type=GlobalRequestProcessor,name=http-8080",
                    "Catalina:type=ThreadPool,name=http-8080"
                  ]
def attributeNames = [ "requestCount", "currentThreadsBusy" ]
def trackDeltas = [ false, false ]
def period = 10
def csvSaveDir = "\${catalina.base}/logs"
 
// Resolve any system property references in csvSaveDir.
csvSaveDir = ReferenceProcessor.resolveRefs(csvSaveDir, [:], true, false, System.getProperty("os.name"))
 
// You probably shouldn't change variables below.
 
def c = applicationContext
// Copy into a local var, referenced in closure.
def logger = log
def sf = c.getBean("sessionFactory")
def r = c.getBean("registry")
def serverManager = c.getBean("serverManager")
def statisticsService = c.getBean("v1/statisticsService")
def chartService = c.getBean("chartService")
def servers = r.getItemByPath("/Provisioning/Servers").items
if (!servers) {
    if (logger.debugEnabled) {
        logger.debug "No servers found, nothing to ping"
    }
    return
}
// Randomize the order to have a better throughput with servers 'down' in the list
Collections.shuffle(servers)
 
// Calculate thread pool size.
def numProcs = Runtime.runtime.availableProcessors()
def numServers = servers.size()
def tpSize = numServers
if (numServers > numProcs) {
    // Make it one thread for every 4 servers to collect data from.
    tpSize = numServers / 4
    // But don't let the thread pool size exceed 2 threads per processor.
    if (tpSize > numProcs * 2) {
        tpSize = numProcs * 2
    }
}
 
def class CSVSaveThreadFactory implements ThreadFactory {
    def AtomicLong threadCount = new AtomicLong(0)
    // Be a good citizen, introduce ourselves and bind to a parent.
    def ThreadGroup threadGroup = new ThreadGroup(Thread.currentThread().threadGroup, "pool-csvsave")
 
    def Thread newThread(Runnable r) {
        // Name worker threads.
        def threadName = "csvsave-worker-${threadCount.getAndIncrement()}"
        return new Thread(threadGroup, r, threadName)
    }
}
 
def ExecutorService exec = Executors.newFixedThreadPool(tpSize,  new CSVSaveThreadFactory())
def ExecutorCompletionService compService = new ExecutorCompletionService(exec)
def int submittedTasksCount = 0
def int timeout = 10 // Maximum time (seconds) allowed for metric collection.
 
def saveData = { List<ChartDefinition> charts, JcrItem server ->
    if (charts == []) {
        println "No metric data to store."
        return
    }
    def csvFile = new File(new File(csvSaveDir),
        server.name.replace(' ', '-') + "-" + period + "s-period-jmx.csv")
    def csvFileExists = false;
    if (csvFile.exists()) csvFileExists = true;
    FileWriter writer = null;
    try {
        writer = new FileWriter(csvFile, true)
    } catch (IOException e3) {
        e3.printStackTrace()
        return
    }
    if (!csvFileExists) {
        // Initialize the CSV file.
        def header = "\"Time\","
        def fieldCount = -1
        objectNames.each { objectName ->
            fieldCount++
            if (fieldCount > 0) header = header + ","
            header = header + "\"" + objectName + " " + attributeNames[fieldCount]
            if (trackDeltas[fieldCount]) header = header + " delta"
            header = header + "\""
        }
        try {
            writer.append(header + "\r")
        } catch (IOException e4) {
            e4.printStackTrace()
            return
        }
    }
    def date = new Date()
    def dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    def dataLine = "\"" + dateFormat.format(date) + "\","
    def valueCount = -1
    charts.each { chart ->
        valueCount++
        if (valueCount > 0) dataLine = dataLine + ","
        dataLine = dataLine + "\"" + chart.lastData + "\""
    }
    try {
        writer.append(dataLine + "\r")
        writer.close()
    } catch (IOException e5) {
        e5.printStackTrace()
        return
    }
}
 
def csvSave = { JcrItem server ->
    def Callable task = {
        def jcrTransactionCallback = { session ->
            RemoteContext.setServerId(server.id)
            def executor = Executors.newSingleThreadExecutor()
            try {
                // Give 'timeout' maximum seconds for metric collection
                // call to succeed.
                def serverId = server.id
 
                def Callable getStatCallable = {
                    SecurityUtils.doPrivileged({
                        JcrUtil.doInTransaction(sf, { session2 ->
 
                            // Collect metric values for this server.
                            RemoteContext.setServerId(serverId);
                            def charts = chartService.getPortletCharts("csvsave")
                            def chartsForThisServer = []
                            for (int i = 0; i < objectNames.size(); i++) {
                                def objectName = objectNames[i]
                                def attributeName = attributeNames[i]
                                def trackDelta = trackDeltas[i]
                                def jmxCollectorInfoId = groupId + "|" + period + "|" + serverId + "|" + objectName + "|" + attributeName + "|" + trackDelta
                                charts.each { chart ->
                                    if (chart.name == jmxCollectorInfoId) {
                                        chartsForThisServer.add(chart)
                                    }
                                }
                            }
                            saveData(chartsForThisServer, server)
 
                        } as JcrCallback)
                    })
                } as Callable
 
                try {
                    executor.submit(getStatCallable).get(timeout, TimeUnit.SECONDS)
                } catch (ExecutionException e) {
                    throw e?.cause?.cause // Extracts real exception thrown from ExecutionException then InvokerInvocationException
                }
                // The collection completed successfully.
            } catch (RemoteAccessException e) {
                if (logger.debugEnabled) {
                    logger.debug("Unreachable server ${server.name}: ${e.message}")
                }
                println "Unreachable server ${server.name}: ${e.message}"
                saveData(Collections.emptyList())
                return null
            } catch (TimeoutException e) {
                if (logger.debugEnabled) {
                    logger.debug("Unreachable server after ${timeout} seconds ${server.name}: ${e.message}")
                }
                println "TimeoutException, server ${server.name}"
                saveData(Collections.emptyList())
                return null
            } catch (Exception e) {
                println "Failed to collect metric from server ${server.name} ${e.message}"
                logger.error("Failed to collect metric from server ${server.name}", e)
            } finally {
                executor.shutdownNow()
            }
        } as JcrCallback
 
        SecurityUtils.doPrivileged({
            JcrUtil.doInTransaction(sf, jcrTransactionCallback)
        })
 
        return null
         
    } as Callable
 
    compService.submit task
    submittedTasksCount++
 
    if (logger.debugEnabled) {
        logger.debug "Submitted tasks: $submittedTasksCount"
    }
}
 
try {
    // Invoke metric value collection and storage, each server in a thread.
    servers.each { server ->
         
        // If the server is unreachable by the console, we'll store
        // a value meaning that we didn't get any data for that sample.
        if (!serverManager.isServerUp(server.id)) {
            saveData(Collections.emptyList(), server)
            return null
        }
 
        if (logger.debugEnabled) {
            logger.debug "Saving CSV stats for server ${server.name}"
        }
        csvSave(server)
    }
 
    // Time out each thread and
    def int tasksLeft = submittedTasksCount
    submittedTasksCount.times {
        // Ensure all submitted tasks complete.
        def result = compService.take().get()
        tasksLeft--
        if (logger.debugEnabled) {
            logger.debug "Done with server ${result}. ${tasksLeft} task(s) remaining"
        }
    }
} finally {
    if (logger.debugEnabled) {
        logger.debug "Shutting down thread pool"
    }
    exec.shutdown()
    if (!exec.awaitTermination(5, TimeUnit.SECONDS)) {
        exec.shutdownNow()
    }
    if (logger.debugEnabled) {
        logger.debug "Thread pool shutdown complete"
    }
}
"Success!"